- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我有一个 PriorityBlockingQueue
。单个线程一次从该队列中消费一条消息并对其进行处理。其他几个线程正在将消息插入队列。生产者线程为他们提交的每条消息分配一个完整的优先级。静态 AtomicLong
用于为每条消息分配一个唯一的、单调递增的 ID。队列的Comparator
先按这个优先级对消息排序,然后同等优先级的消息按ID排序(最低ID在前)。
问题:有时一个生产者会提交大量消息。这会让其他生产者无法处理他们的消息。我想做的是让生产者之间的消费者轮询以获取同等优先级的消息(同时仍按提交顺序处理单个生产者的同等优先级消息)。但我不知道如何编写 Comparator
来执行此操作。
我考虑的另一种选择是为每个生产者设置一个单独的队列。但是,我认为这行不通,因为我不知道单线程在多个队列上等待的任何方式。
最佳答案
我觉得为每个生产者使用一个 Queue
来实现它更直接。一个线程不能等待多个 Queue
,但您可以将所有 Queue
组合到一个帮助程序类中,这样就不需要了。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
public class RoundRobin<P, E> {
private final Lock lock = new ReentrantLock();
private final Condition added = lock.newCondition();
@GuardedBy("lock") private final Map<P, Queue<E>> queues = new LinkedHashMap<>();
public boolean add(P producer, E item) {
lock.lock();
try {
if (!queues.containsKey(producer)) {
queues.put(producer, new PriorityBlockingQueue<>());
}
added.signalAll();
return queues.get(producer).add(item);
} finally {
lock.unlock();
}
}
public Iterator<E> roundRobinIterator() {
return new Iterator<E>() {
private Iterator<? extends Queue<E>> i = null;
private boolean singlePass = true;
@Override
public boolean hasNext() {
return true;
}
@Override
public E next() {
lock.lock();
try {
while (true) {
if (i == null || !i.hasNext()) {
i = queues.values().iterator();
singlePass = true;
}
while (i.hasNext()) {
Queue<E> q = i.next();
if (!q.isEmpty()) {
if (singlePass) {
// copy the iterator to prevent
// ConcurrentModificationExceptions
singlePass = false;
i = copy(i);
}
return q.poll();
}
}
if (singlePass) {
// If singlePass is true then we just checked every
// queue and they were all empty.
// Wait for another element to be added.
added.await();
}
}
} catch (InterruptedException e) {
throw new NoSuchElementException(e.getMessage());
} finally {
lock.unlock();
}
}
private <T> Iterator<? extends T> copy(Iterator<? extends T> i) {
List<T> copy = new ArrayList<>();
while (i.hasNext()) {
copy.add(i.next());
}
return copy.iterator();
}
};
}
}
关于java - 如何实现 PriorityBlockingQueue 的循环顺序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27737781/
我使用 PriorityBlockingQueue 来维护对象列表,这些对象的顺序由比较器决定。我的要求如下:首先,我将N对象添加到队列中,队列用它来维护有序列表。后来,我更改了已添加到队列中的对象中
在PriorityBlockingQueue的规范中,它说: While this queue is logically unbounded, attempted additions may fail
我有一个PriorityBlockingQueue如下: BlockingQueue robbleListQueue = new PriorityBlockingQueue(); Robble实现Co
我有一个包含元素列表的 PriorityBlockingQueue。我已经实现了 Comparable 接口(interface)并覆盖了 compareTo() 以定义哪个元素小于、等于或大于其他元
我正在使用 PriorityBlockingQueue 来存储功能的 tf-idf 分数。由于这部分是多线程的,我使用 synchronized block 来处理并发性: long ticAdd =
我正在尝试编写一个返回 CompletableFuture 的单线程执行器当任务被调度并基于 PriorityBlockingQueue 执行任务时. 我的任务如下所示: public inter
如果某个任务已经在阻塞队列中(假设轮询已满)并且我现在希望它具有更高的优先级,如何更改它的优先级? 例如:实现这个答案 Specify task order execution in Java 我将线
我有一个应用程序,它从多个序列化对象日志中读取对象并将它们交给另一个类进行处理。我的问题集中在如何高效、干净地读取对象并将其发送出去。 代码是从应用程序的旧版本中提取的,但我们最终保持原样。直到上周才
根据Javadocs , PriorityBlockingQueue 不保证具有相同优先级的元素的排序。他们建议使用辅助键 (sequenceNumber) 来强制执行特定的排序(例如 FIFO)。
我一直在阅读 source code of PriorityBlockingQueue在 Java 中,我想知道: 为什么 tryGrow() 方法释放在 offer() 方法中获取的锁,只是为了非阻
我有一个 PriorityBlockingQueue。单个线程一次从该队列中消费一条消息并对其进行处理。其他几个线程正在将消息插入队列。生产者线程为他们提交的每条消息分配一个完整的优先级。静态 Ato
PriorityBlockingQueue 有多少把锁?take 和put 操作是否同步?我找不到有关此类队列的太多信息。我使用的是单线程 PriorityQueue。 最佳答案 How many l
DelayQueue public class DelayQueue extends AbstractQueue implements BlockingQueue 1、 Delayed元
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭10 年前。 Improve th
我正在尝试为我的游戏制作一个简单的消息管理器。但是,我收到以下异常: Exception in thread "main" Exception in thread "Thread-4" java.la
public class CompareOrder implements Comparator { @Override public int compare(T left, T rig
我在自定义线程拉取中使用 PriorityBlockingQueue 时遇到问题,其中 poll 方法导致 NullPointerException。使用此设置时 int POOL_SIZE = 5;
首先:我已经阅读了以下两个问题及其可能的解决方案: ScheduledThreadPoolExecutors and custom queue Java Executors: how can I se
我在这个例子中使用 PriorityBlockingQueue 实现了我的 ThreadPoolExecutor: https://stackoverflow.com/a/12722648/22067
我进行了很多搜索,但找不到解决问题的方法。 我有自己的类 BaseTask,它使用 ThreadPoolExecutor 来处理任务。我想要任务优先级,但是当我尝试使用 PriorityBlockin
我是一名优秀的程序员,十分优秀!