gpt4 book ai didi

java - 如何实现 PriorityBlockingQueue 的循环顺序?

转载 作者:搜寻专家 更新时间:2023-11-01 02:24:43 24 4
gpt4 key购买 nike

我有一个 PriorityBlockingQueue。单个线程一次从该队列中消费一条消息并对其进行处理。其他几个线程正在将消息插入队列。生产者线程为他们提交的每条消息分配一个完整的优先级。静态 AtomicLong 用于为每条消息分配一个唯一的、单调递增的 ID。队列的Comparator先按这个优先级对消息排序,然后同等优先级的消息按ID排序(最低ID在前)。

问题:有时一个生产者会提交大量消息。这会让其他生产者无法处理他们的消息。我想做的是让生产者之间的消费者轮询以获取同等优先级的消息(同时仍按提交顺序处理单个生产者的同等优先级消息)。但我不知道如何编写 Comparator 来执行此操作。

我考虑的另一种选择是为每个生产者设置一个单独的队列。但是,我认为这行不通,因为我不知道单线程在多个队列上等待的任何方式。

最佳答案

我觉得为每个生产者使用一个 Queue 来实现它更直接。一个线程不能等待多个 Queue,但您可以将所有 Queue 组合到一个帮助程序类中,这样就不需要了。

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.annotation.concurrent.GuardedBy;

public class RoundRobin<P, E> {
private final Lock lock = new ReentrantLock();
private final Condition added = lock.newCondition();

@GuardedBy("lock") private final Map<P, Queue<E>> queues = new LinkedHashMap<>();

public boolean add(P producer, E item) {
lock.lock();
try {
if (!queues.containsKey(producer)) {
queues.put(producer, new PriorityBlockingQueue<>());
}

added.signalAll();
return queues.get(producer).add(item);
} finally {
lock.unlock();
}
}

public Iterator<E> roundRobinIterator() {
return new Iterator<E>() {
private Iterator<? extends Queue<E>> i = null;
private boolean singlePass = true;

@Override
public boolean hasNext() {
return true;
}

@Override
public E next() {
lock.lock();
try {
while (true) {
if (i == null || !i.hasNext()) {
i = queues.values().iterator();
singlePass = true;
}

while (i.hasNext()) {
Queue<E> q = i.next();
if (!q.isEmpty()) {
if (singlePass) {
// copy the iterator to prevent
// ConcurrentModificationExceptions
singlePass = false;
i = copy(i);
}
return q.poll();
}
}

if (singlePass) {
// If singlePass is true then we just checked every
// queue and they were all empty.
// Wait for another element to be added.
added.await();
}
}
} catch (InterruptedException e) {
throw new NoSuchElementException(e.getMessage());
} finally {
lock.unlock();
}
}

private <T> Iterator<? extends T> copy(Iterator<? extends T> i) {
List<T> copy = new ArrayList<>();
while (i.hasNext()) {
copy.add(i.next());
}
return copy.iterator();
}
};
}
}

关于java - 如何实现 PriorityBlockingQueue 的循环顺序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27737781/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com