gpt4 book ai didi

java - Java 中最佳队列消费者实现

转载 作者:行者123 更新时间:2023-12-01 10:29:40 25 4
gpt4 key购买 nike

美好的一天!

我想创建一个 ExecutorService 消费者,用于从队列中获取数据并在服务器端使用它。这个想法是 - 我时不时地轮询队列,如果我发现它不为空,我会用 N 个线程(比如说 5 个)启动 ExecutorService。然后我 w8 while 队列将为空并关闭线程。再次 - 轮询数据队列......这个算法可以吗?或者可能有一些现成的实现/框架来完成此类任务?

我发现了 ConcurrentQueue cusumers 的实现:

public class ConcurrentQueueClient implements Runnable {

private Queue<String> concurrentQueue;

public ConcurrentQueueClient(Queue concurrentQueue) {
this.concurrentQueue = concurrentQueue;
}

public void run() {
boolean stopCondition = (concurrentQueue.size() == 0);

while (!stopCondition) {
for (int i = 0; i < concurrentQueue.size(); i++) {
System.out.println("Client dequeue item "
+ concurrentQueue.poll());

}
stopCondition = (concurrentQueue.size() == 0);
}

System.out.println("Client thread exiting...");
}
}

并以这种方式测试它:

 Queue<String> queue = new ConcurrentLinkedQueue<String>();
ExecutorService consumers = null;
while(true) {

if(queue.size() != 0) {
consumers = Executors.newFixedThreadPool(100);
for (int i = 0; i < 5; i++) {
ConcurrentQueueClient client = new ConcurrentQueueClient(queue);
consumers.execute(client);
}
}

while (queue.size() != 0) {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

consumers.shutdown();
try {
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}


}

最佳答案

重新开始。将字符串包装在可调用或可运行中,并将它们排队到执行器服务。

如果要处理的数据集是有限的,那么可以让主线程像以前一样调用consumer.shutdown()和consumer.awaitTermination(...),但没有 sleep 循环。如果您要无限期地从队列中处理,则在服务关闭之前不要使用 shutdown()。

如果您没有有限的阻塞队列(没有任何东西可以阻塞queue.put()),您也会面临内存问题。 ArrayBlockingQueue 可以在创建时提供给执行程序服务(请参阅 ThreadPoolExecutor(...) )

执行器服务的线程按照设计对任务队列进行检查(queue.take())。尽量避免轮询,它会浪费CPU。始终尝试根据可重入锁的条件等待/通知(或等待/发出信号)(这一切都在执行器服务代码中为您处理)

关于java - Java 中最佳队列消费者实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35156439/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com