gpt4 book ai didi

java - 同一个 ExecutorService 中的多个生产者和单个消费者

转载 作者:行者123 更新时间:2023-11-30 08:36:25 24 4
gpt4 key购买 nike

问题陈述:从无限的整数流中识别两个连续的整数,其中这些整数由多个生产者生成,但当相同的数字再次重复时,单个消费者会发出警报。

我有多个 Producer 和一个 Consumer。如果我将 Consumer 提交给相同的 ExecutorService,Consumer 不会启动。但是,如果我在单独的线程中运行消费者,消费者线程会按预期启动。

代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class FixedBlockingQueue {
final BlockingQueue<Integer> queue;
private int capacity;

public FixedBlockingQueue(int capacity){
super();
this.capacity = capacity;
queue = new ArrayBlockingQueue<Integer>(capacity);
System.out.println("Capactiy:"+this.capacity);
}
public void addElement(Integer element){
try{
queue.put(element);
}catch(Exception err){
err.printStackTrace();
}
}
public void startThreads(){
ExecutorService es = Executors.newFixedThreadPool(1);
for ( int i =0; i < 10; i++){
es.submit(new MyProducer(this));
}
//es.submit(new MyConsumer(queue));
new Thread(new MyConsumer(this)).start();
}
public BlockingQueue<Integer> getQueue(){
return queue;
}
public static void main(String args[]){
FixedBlockingQueue f = new FixedBlockingQueue(1);
f.startThreads();
}
}

class MyProducer implements Runnable{

private FixedBlockingQueue queue;
public MyProducer(FixedBlockingQueue queue){
this.queue = queue;
}
public void run(){
for ( int i=1; i< 5; i++){
queue.addElement(new Integer(i));
System.out.println("adding:"+i);
}
}
}

class MyConsumer implements Runnable{
private BlockingQueue<Integer> queue;
Integer firstNumber = 0;
private final ReentrantLock lock = new ReentrantLock();

public MyConsumer(FixedBlockingQueue fQueue){
this.queue = fQueue.getQueue();
}
/* TODO : Compare two consecutive integers in queue are same or not*/
public void run(){
Integer secondNumber = 0;
while ( true){
try{
lock.lock();
System.out.println("queue size:"+queue.size());
if ( queue.size() > 0) {
secondNumber = queue.remove();
System.out.println("Removed:"+secondNumber);
System.out.println("Numbers:Num1:Num2:"+firstNumber+":"+secondNumber);
if ( firstNumber.intValue() == secondNumber.intValue()){
System.out.println("Numbers matched:"+firstNumber);
}
firstNumber = secondNumber;
}
Thread.sleep(1000);
}catch(Exception err){
err.printStackTrace();
}finally{
lock.unlock();
}
}
}
}

输出:

Capactiy:1
adding:1

如果我从

更改代码
es.submit(new MyConsumer(queue));
//new Thread(new MyConsumer(queue)).start();

//es.submit(new MyConsumer(queue));
new Thread(new MyConsumer(queue)).start();

消费者线程正常启动。

输出:

Capactiy:1
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:0:1
adding:2
queue size:1
Removed:2
Numbers:Num1:Num2:1:2
adding:3
queue size:1
Removed:3
Numbers:Num1:Num2:2:3
adding:4
queue size:1
Removed:4
Numbers:Num1:Num2:3:4
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:4:1
adding:2
queue size:1
Removed:2
adding:3
Numbers:Num1:Num2:1:2
queue size:1
Removed:3
Numbers:Num1:Num2:2:3

第一种方法:

我知道这个数字不会被消费者消耗,但理想情况下它不应该阻止其他 Producer 任务的提交。

如果是这样的话,用ExecutorService代替简单的Threads是不是能100%实现?

最佳答案

您创建了一个包含单个线程和一个固定容量为 1 的 BlockingQueue 的线程池。然后您向池中提交了三个任务:前两个分别尝试将五个值放入队列,然后是一个在任何值可用时使值出队。

因为你的固定大小池只有一个线程,所以你提交给它的任务将按顺序运行,而不是并行。你先提交一个生产者任务,所以它先运行。但是一旦它排入第一个数字,它就无法进行任何进一步的处理,因为队列已满。并且队列将永远保持满,因为生产者任务必须在池线程可用于另一个任务(例如消费者)之前完成。

我不确定您为什么为此使用线程池,因为直接进行线程管理并不难,特别是因为您的任务已经实现了 Runnable。但是,如果您确实使用了池,那么请确保其中有足够的线程来同时容纳所有任务。

另请注意,BlockingQueue 实现应该是线程安全的,标准库提供的所有实现确实如此。因此,您不需要在 addElement() 中执行自己的锁定。此外,如果您确实需要执行自己的锁定,那么您不仅需要在将元素入队时执行此操作,还需要在将它们出队时执行此操作。

此外,您让生产者任务通过 FixedBlockingQueue 实例间接向底层队列添加元素,但让消费者任务直接进入底层队列,这实在是太奇怪了。

FixedBlockingQueue 类的名称选择不当,因为它暗示该类实现了 BlockingQueue,但实际上该类并未实现。

关于java - 同一个 ExecutorService 中的多个生产者和单个消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37795707/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com