gpt4 book ai didi

java - 生产者/消费者工作队列

转载 作者:搜寻专家 更新时间:2023-10-30 21:46:29 24 4
gpt4 key购买 nike

我正在努力寻找实现我的处理管道的最佳方式。

我的生产者将工作提供给 BlockingQueue。在消费者端,我轮询队列,将我得到的内容包装在 Runnable 任务中,然后将其提交给 ExecutorService。

while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}

这并不理想;当然,ExecutorService 有自己的队列,所以真正发生的是我总是完全耗尽我的工作队列并填充任务队列,任务队列会随着任务完成而慢慢清空。

我意识到我可以在生产者端对任务进行排队,但我真的不想这样做——我喜欢我的工作队列的间接/隔离是哑字符串;他们会发生什么真的与制作人无关。恕我直言,强制生产者对 Runnable 或 Callable 进行排队会破坏抽象。

但我确实希望共享工作队列代表当前处理状态。如果消费者没有跟上,我希望能够阻止生产者。

我很想使用 Executors,但我觉得我在反对他们的设计。我可以喝一部分 Kool-ade,还是必须一饮而尽?我抗拒排队任务是不是脑子错了? (我怀疑我可以将 ThreadPoolExecutor 设置为使用 1-task 队列并覆盖它的执行方法来阻止而不是 reject-on-queue-full,但这感觉很恶心。)

建议?

最佳答案

I want the shared work queue to represent the current processing state.

尝试使用共享 BlockingQueue并有一个工作线程池从队列中取出工作项。

I want to be able to block the producers if the consumers aren't keeping up.

两者都是ArrayBlockingQueueLinkedBlockingQueue支持有界队列,这样它们将在满时阻塞。使用阻塞 put()方法确保生产者在队列已满时被阻塞。

这是一个粗略的开始。您可以调整工作人员数量和队列大小:

public class WorkerTest<T> {

private final BlockingQueue<T> workQueue;
private final ExecutorService service;

public WorkerTest(int numWorkers, int workQueueSize) {
workQueue = new LinkedBlockingQueue<T>(workQueueSize);
service = Executors.newFixedThreadPool(numWorkers);

for (int i=0; i < numWorkers; i++) {
service.submit(new Worker<T>(workQueue));
}
}

public void produce(T item) {
try {
workQueue.put(item);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}


private static class Worker<T> implements Runnable {
private final BlockingQueue<T> workQueue;

public Worker(BlockingQueue<T> workQueue) {
this.workQueue = workQueue;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
T item = workQueue.take();
// Process item
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}

关于java - 生产者/消费者工作队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2233561/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com