作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要实现一种生产者/消费者方案,出于性能原因,消费者尝试在一批中处理许多工作项(每个工作项都会耗尽工作队列)。
目前,我只是创建固定数量的相同工作人员,它们在循环中的同一队列上工作。由于其中一些可能会死亡,我需要负责更换它们。
我很想使用固定线程池来管理线程替换,但我的情况没有映射到Executor
方案,因为生产者和消费者所需的粒度不匹配——只有消费者可以收集合适批处理的作品。
当我的工作项无法表示为 Runnables/Callables 时,我可以选择哪些管理(固定大小)线程池?
(或者,我可以以某种方式保留批量生产的工作项的要求,并且仍然能够使用执行器服务吗?)
最佳答案
一种方法是将生产者/消费者设置为可运行的,并使用 BlockingQueue 来在其之间传递任何数据。
例如,下面是生产者的简单实现,它向队列
生成String
项,以及批量读取项的消费者:
class ProducerConsumerPing {
private static final class PingProducer implements Runnable {
private final BlockingQueue<String> queue;
PingProducer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
while (true) {
queue.offer("ping");
}
}
}
private static final class PingConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final int batchSize;
PingConsumer(BlockingQueue<String> queue, int batchSize) {
this.queue = queue;
this.batchSize = batchSize;
}
public void run() {
while (true) {
List<String> batch = new ArrayList<>();
queue.drainTo(batch, batchSize);
System.out.println("Consumed: " + batch);
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService producers = Executors.newFixedThreadPool(10);
ExecutorService consumers = Executors.newFixedThreadPool(10);
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
for (int i = 0; i < 10; i++) {
producers.submit(new PingProducer(queue));
}
for (int i = 0; i < 10; i++) {
consumers.submit(new PingConsumer(queue, 10));
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
注释:
在示例中,我使用String
作为工作项
,但当然您可以将任何Object
放入队列
消费者批处理是通过使用BlockingQueue.drainTo(Collection, int)来实现的
关于java - 如何使用自动线程管理在Java中实现生产者/消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58170541/
我是一名优秀的程序员,十分优秀!