gpt4 book ai didi

java - 如何使用自动线程管理在Java中实现生产者/消费者

转载 作者:行者123 更新时间:2023-12-02 09:28:27 30 4
gpt4 key购买 nike

我需要实现一种生产者/消费者方案,出于性能原因,消费者尝试在一批中处理许多工作项(每个工作项都会耗尽工作队列)。

目前,我只是创建固定数量的相同工作人员,它们在循环中的同一队列上工作。由于其中一些可能会死亡,我需要负责更换它们。

我很想使用固定线程池来管理线程替换,但我的情况没有映射到Executor方案,因为生产者和消费者所需的粒度不匹配——只有消费者可以收集合适批处理的作品。

当我的工作项无法表示为 Runnables/Callables 时,我可以选择哪些管理(固定大小)线程池?

(或者,我可以以某种方式保留批量生产的工作项的要求,并且仍然能够使用执行器服务吗?)

最佳答案

一种方法是将生产者/消费者设置为可运行的,并使用 BlockingQueue 来在其之间传递任何数据。

例如,下面是生产者的简单实现,它向队列生成String项,以及批量读取项的消费者:

class ProducerConsumerPing {
private static final class PingProducer implements Runnable {
private final BlockingQueue<String> queue;

PingProducer(BlockingQueue<String> queue) {
this.queue = queue;
}

public void run() {
while (true) {
queue.offer("ping");
}
}
}

private static final class PingConsumer implements Runnable {

private final BlockingQueue<String> queue;
private final int batchSize;

PingConsumer(BlockingQueue<String> queue, int batchSize) {
this.queue = queue;
this.batchSize = batchSize;
}

public void run() {
while (true) {
List<String> batch = new ArrayList<>();
queue.drainTo(batch, batchSize);
System.out.println("Consumed: " + batch);
}
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService producers = Executors.newFixedThreadPool(10);
ExecutorService consumers = Executors.newFixedThreadPool(10);
BlockingQueue<String> queue = new LinkedBlockingQueue<>();

for (int i = 0; i < 10; i++) {
producers.submit(new PingProducer(queue));
}

for (int i = 0; i < 10; i++) {
consumers.submit(new PingConsumer(queue, 10));
}

producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}

注释:

关于java - 如何使用自动线程管理在Java中实现生产者/消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58170541/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com