gpt4 book ai didi

java - AtomicBoolean 未更新

转载 作者:行者123 更新时间:2023-12-02 11:24:03 26 4
gpt4 key购买 nike

我正在实现一个标准的生产者-消费者程序,以便生产者在生产 200 个产品后停止。为了表明这一点,生产者将 -1 放入 BlockingQueue 变量中,否则该变量始终包含正整数。我的消费者实现如下:

public class Consumer implements Runnable{
private BlockingQueue<Integer> queue;
private AtomicBoolean isProducerClosed = new AtomicBoolean(false);

public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}

@Override
public void run() {

try {
Thread.sleep(50);

while(!isProducerClosed.get()) {
try {
Integer value = queue.take();

if ((value.intValue() == -1)){
System.out.println(Thread.currentThread().getName() + " Encountered -1. Signal to shutdown consumers.");
//isProducerClosed.set(true);
isProducerClosed.compareAndSet(false, true);
break;
}

System.out.println(Thread.currentThread().getName() + " consuming : " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}


}

}

替代消费者逻辑(仍然遇到相同的问题):

@Override
public void run() {

while(true) {
try {
Thread.sleep(50);
Integer value = null;

synchronized (this) {
if(!isProducerClosed.get()) {
value = queue.take();
if ((value.intValue() == -1)) {
System.out.println(Thread.currentThread().getName() + " Encountered -1. Signal to shutdown consumers.");
isProducerClosed.set(true);
isProducerClosed = isProducerClosed;
System.out.println("Current value of isProducerClosed : " + isProducerClosed.get());
}
}
}

if (!isProducerClosed.get()) {
System.out.println(Thread.currentThread().getName() + " consuming : " + value);
} else {
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

执行此操作时,我的消费者线程卡在 queue.take() 上,就好像它们正在等待队列中的产品可用一样。一种可能是:所有消费者线程同时检查条件 isProducerClosed.get(),进入 while 循环,并访问 queue.take() 方法。这是正确的假设吗?如果是,是否有任何方法可以在不使用低级 synchronized 关键字的情况下实现此目的?我用 volatile boolean 变量尝试了同样的事情,结果完全相同。只有在将该变量设为静态后,我才能看到所有消费者在队列中遇到 -1 后被终止(因为 var 现在是类拥有的)。

我的调用代码:

public class ProducerConsumer {

private BlockingQueue<Integer> sharedQueue = new ArrayBlockingQueue<Integer>(10);


public void executeProducerConsumer(int producerCount, int consumerCount){
ExecutorService executorService = Executors.newFixedThreadPool(3, Executors.defaultThreadFactory());

for(int i = 0; i < producerCount; i++) {
executorService.submit(new Producer(sharedQueue)); //producer
}

for(int i = 0; i < consumerCount; i++) {
executorService.submit(new Consumer(sharedQueue)); //i-th consumer.
}


//initiates clossure of threads after completion, in async manner.
executorService.shutdown();

//wait till all threads are done.
try {
executorService.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("The End.");
}

}

生产者代码:

public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
private volatile int maxNumberOfItemsToProduce = 10;

public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}

@Override
public void run() {
Random random = new Random();

while(true){

if(maxNumberOfItemsToProduce == 0) {
try {
queue.put(-1);
System.out.println("Terminating Producer after producing max number of products.");
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}

try {
queue.put(random.nextInt(300));
} catch (InterruptedException e) {
e.printStackTrace();
}


maxNumberOfItemsToProduce--;
}

}

}

控制台截图: enter image description here

最佳答案

One possibility could be : all consumer threads checked the condition isProducerClosed.get() at the same time, entered the while loop, and access queue.take() method. Is this correct assumption? If yes, is there any way to implement this without using low level synchronized keyword? I tried the same thing with volatile boolean variable and the result was exactly same. Only after making that variable static, was i able to see all consumers getting terminated after encountering -1 in the queue (as the var is now class-owned).

是的,这个假设是正确的。

第一个问题是:isProducerClosed 不在消费者之间共享。它必须在消费者之间共享,这样如果一个消费者设置了它的值,其他消费者也可以看到该值。使其静态使其共享,因此情况之后会有所改善

第二个问题:即使在共享 isProducerClosed 后,您也可能会遇到这样的情况:多个消费者会在空队列上执行 queue.take() (一个线程可能会获取最后一个值,但另一个线程在第一个线程将 isProducerClosed 设置为 true 之前执行 take())。您需要对此进行同步(例如通过双重检查)

示例代码(除消费者之外的部分仍包含错误/竞争)-

public class TestClass {

private BlockingQueue<Integer> sharedQueue = new ArrayBlockingQueue<Integer>(10);

public static void main(String[] args) {
TestClass t = new TestClass();
t.executeProducerConsumer(3, 3);
}

public void executeProducerConsumer(int producerCount, int consumerCount) {
ExecutorService executorService = Executors.newFixedThreadPool(producerCount + consumerCount, Executors.defaultThreadFactory());

for (int i = 0; i < producerCount; i++) {
executorService.submit(new Producer(sharedQueue)); //producer
}

for (int i = 0; i < consumerCount; i++) {
executorService.submit(new Consumer(sharedQueue)); //i-th consumer.
}

//initiates clossure of threads after completion, in async manner.
executorService.shutdown();

//wait till all threads are done.
try {
executorService.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("The End.");
}

}

class Consumer implements Runnable {

private BlockingQueue<Integer> queue;
private static volatile boolean isProducerClosed = false; // make this static so that it is shared across consumers

public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
Thread.sleep(50);
Integer value;

while (!isProducerClosed) {
try {
synchronized (queue) { //synchronize so that only one thread can talk to the queue at a time
if (!isProducerClosed) { //double check
value = queue.take(); // we can now safely take an item
if ((value.intValue() == -1)) {
isProducerClosed = true;
System.out.println(Thread.currentThread().getName() + " Encountered -1. Signal to shutdown consumers.");
break;
}
} else {
System.out.println(Thread.currentThread().getName() + " Last item was taken by some other consumer. Exiting!");
break;
}
}
consumeValue(value); //Consume the value outside the lock
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}


}

private void consumeValue(Integer value) {
System.out.println(Thread.currentThread().getName() + " Consuming value :" + value);
}
}

class Producer implements Runnable {

private BlockingQueue<Integer> queue;
private static volatile int maxNumberOfItemsToProduce = 10;

public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}

@Override
public void run() {
Random random = new Random();

while (true) {

if (maxNumberOfItemsToProduce == 0) {
try {
queue.put(-1);
System.out.println("Terminating Producer after producing max number of products.");
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}

try {
queue.put(random.nextInt(300));
} catch (InterruptedException e) {
e.printStackTrace();
}

maxNumberOfItemsToProduce--;
}

}
}

关于java - AtomicBoolean 未更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49724948/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com