- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在实现一个标准的生产者-消费者程序,以便生产者在生产 200 个产品后停止。为了表明这一点,生产者将 -1 放入 BlockingQueue 变量中,否则该变量始终包含正整数。我的消费者实现如下:
public class Consumer implements Runnable{
private BlockingQueue<Integer> queue;
private AtomicBoolean isProducerClosed = new AtomicBoolean(false);
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(50);
while(!isProducerClosed.get()) {
try {
Integer value = queue.take();
if ((value.intValue() == -1)){
System.out.println(Thread.currentThread().getName() + " Encountered -1. Signal to shutdown consumers.");
//isProducerClosed.set(true);
isProducerClosed.compareAndSet(false, true);
break;
}
System.out.println(Thread.currentThread().getName() + " consuming : " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
替代消费者逻辑(仍然遇到相同的问题):
@Override
public void run() {
while(true) {
try {
Thread.sleep(50);
Integer value = null;
synchronized (this) {
if(!isProducerClosed.get()) {
value = queue.take();
if ((value.intValue() == -1)) {
System.out.println(Thread.currentThread().getName() + " Encountered -1. Signal to shutdown consumers.");
isProducerClosed.set(true);
isProducerClosed = isProducerClosed;
System.out.println("Current value of isProducerClosed : " + isProducerClosed.get());
}
}
}
if (!isProducerClosed.get()) {
System.out.println(Thread.currentThread().getName() + " consuming : " + value);
} else {
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行此操作时,我的消费者线程卡在 queue.take()
上,就好像它们正在等待队列中的产品可用一样。一种可能是:所有消费者线程同时检查条件 isProducerClosed.get()
,进入 while 循环,并访问 queue.take()
方法。这是正确的假设吗?如果是,是否有任何方法可以在不使用低级 synchronized
关键字的情况下实现此目的?我用 volatile boolean 变量尝试了同样的事情,结果完全相同。只有在将该变量设为静态后,我才能看到所有消费者在队列中遇到 -1 后被终止(因为 var 现在是类拥有的)。
我的调用代码:
public class ProducerConsumer {
private BlockingQueue<Integer> sharedQueue = new ArrayBlockingQueue<Integer>(10);
public void executeProducerConsumer(int producerCount, int consumerCount){
ExecutorService executorService = Executors.newFixedThreadPool(3, Executors.defaultThreadFactory());
for(int i = 0; i < producerCount; i++) {
executorService.submit(new Producer(sharedQueue)); //producer
}
for(int i = 0; i < consumerCount; i++) {
executorService.submit(new Consumer(sharedQueue)); //i-th consumer.
}
//initiates clossure of threads after completion, in async manner.
executorService.shutdown();
//wait till all threads are done.
try {
executorService.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The End.");
}
}
生产者代码:
public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
private volatile int maxNumberOfItemsToProduce = 10;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
Random random = new Random();
while(true){
if(maxNumberOfItemsToProduce == 0) {
try {
queue.put(-1);
System.out.println("Terminating Producer after producing max number of products.");
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
try {
queue.put(random.nextInt(300));
} catch (InterruptedException e) {
e.printStackTrace();
}
maxNumberOfItemsToProduce--;
}
}
}
最佳答案
One possibility could be : all consumer threads checked the condition isProducerClosed.get() at the same time, entered the while loop, and access queue.take() method. Is this correct assumption? If yes, is there any way to implement this without using low level synchronized keyword? I tried the same thing with volatile boolean variable and the result was exactly same. Only after making that variable static, was i able to see all consumers getting terminated after encountering -1 in the queue (as the var is now class-owned).
是的,这个假设是正确的。
第一个问题是:isProducerClosed
不在消费者之间共享。它必须在消费者之间共享,这样如果一个消费者设置了它的值,其他消费者也可以看到该值。使其静态
使其共享,因此情况之后会有所改善
第二个问题:即使在共享 isProducerClosed
后,您也可能会遇到这样的情况:多个消费者会在空队列上执行 queue.take()
(一个线程可能会获取最后一个值,但另一个线程在第一个线程将 isProducerClosed
设置为 true 之前执行 take()
)。您需要对此进行同步(例如通过双重检查)
示例代码(除消费者之外的部分仍包含错误/竞争)-
public class TestClass {
private BlockingQueue<Integer> sharedQueue = new ArrayBlockingQueue<Integer>(10);
public static void main(String[] args) {
TestClass t = new TestClass();
t.executeProducerConsumer(3, 3);
}
public void executeProducerConsumer(int producerCount, int consumerCount) {
ExecutorService executorService = Executors.newFixedThreadPool(producerCount + consumerCount, Executors.defaultThreadFactory());
for (int i = 0; i < producerCount; i++) {
executorService.submit(new Producer(sharedQueue)); //producer
}
for (int i = 0; i < consumerCount; i++) {
executorService.submit(new Consumer(sharedQueue)); //i-th consumer.
}
//initiates clossure of threads after completion, in async manner.
executorService.shutdown();
//wait till all threads are done.
try {
executorService.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The End.");
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
private static volatile boolean isProducerClosed = false; // make this static so that it is shared across consumers
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(50);
Integer value;
while (!isProducerClosed) {
try {
synchronized (queue) { //synchronize so that only one thread can talk to the queue at a time
if (!isProducerClosed) { //double check
value = queue.take(); // we can now safely take an item
if ((value.intValue() == -1)) {
isProducerClosed = true;
System.out.println(Thread.currentThread().getName() + " Encountered -1. Signal to shutdown consumers.");
break;
}
} else {
System.out.println(Thread.currentThread().getName() + " Last item was taken by some other consumer. Exiting!");
break;
}
}
consumeValue(value); //Consume the value outside the lock
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void consumeValue(Integer value) {
System.out.println(Thread.currentThread().getName() + " Consuming value :" + value);
}
}
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
private static volatile int maxNumberOfItemsToProduce = 10;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
Random random = new Random();
while (true) {
if (maxNumberOfItemsToProduce == 0) {
try {
queue.put(-1);
System.out.println("Terminating Producer after producing max number of products.");
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
try {
queue.put(random.nextInt(300));
} catch (InterruptedException e) {
e.printStackTrace();
}
maxNumberOfItemsToProduce--;
}
}
}
关于java - AtomicBoolean 未更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49724948/
我想知道调用之间是否有任何区别(或可能的副作用): AtomicBoolean.set(true) 和 AtomicBoolean.compareAndset(false, true) AtomicB
我正在实现一个标准的生产者-消费者程序,以便生产者在生产 200 个产品后停止。为了表明这一点,生产者将 -1 放入 BlockingQueue 变量中,否则该变量始终包含正整数。我的消费者实现如下:
AtomicBoolean 使用 native 代码进行同步。它如何转化为 java 锁? 有什么区别: AtomicBoolean a = new AtomicBoolean(); synchron
我了解 AtomicInteger 的有效用例,但我对 AtomicBoolean 如何保证两个操作的原子性感到困惑 i. '改变 boolean 值'和ii。在以下经常引用的 AtomicBoole
我经常使用以下模式来创建可取消的线程: public class CounterLoop implements Runnable { private volatile AtomicBoolea
我正在使用 AtomicBoolean 来强制执行线程之间的 volatile 可见性。一个线程正在更新值,另一个线程只读取它。 假设当前值为 true。现在假设一个写线程再次将其值设置为true:
我有一个系统,有很多写入器和一个读取器,每个读取器都在一个单独的线程中运行。当工作可用时,作者会通知读者,而读者会阻塞,直到收到通知。 鉴于作者的数量,我想使用无锁实现来通知读者。每次读者醒来时,它都
在我正在开发的应用程序中,我发现了以下代码片段: public class MyClass { private AtomicBoolean atomicBoolean = new Atomic
我不喜欢用 synchronized(this) 锁定我的代码,所以我正在尝试使用 AtomicBooleans。在代码片段中,XMPPConnectionIF.connect() 与远程服务器建立套
这个问题已经有答案了: When is it preferable to use volatile boolean in Java rather than AtomicBoolean? [duplic
我需要确保每个实例生命周期仅执行一次特定的启动和停止代码,并且该实例无法“重新启动”。以下代码足以满足多个线程可能对实例进行操作的场景吗? public final class MyRunnable
我有一个简单的类,我想使用 AtomicBoolean 测试它的线程安全性。如果测试的线程超过 2 个,则它不起作用。它在某些线程的 doSome 方法中抛出 NullPointerExcetion:
阅读此代码AsyncSubscriber.java :编码器使用 AtomicBoolean 创建 Happens Before 关系,我想知道: 1_是否相当于使用synchronized bloc
MyRunnable 有一个原子 boolean 值,需要通过一种方法将其设置为 true, 另一种在多线程环境中设置为 false 的方法 private final class MyRunn
我有下一个任务 public class QuittableTask implements Runnable { final int id; public QuittableTask(int
我正在为 AtomicInteger 和 AtomicBoolean 编写单元测试。它们将用作引用测试,用于测试 objective-c 中这些类的仿真,用于翻译项目。 我认为 AtomicInteg
这是我编写的一些代码,用于以通用方式处理一个事件处理程序,该事件处理程序对于任何 JavaFX 事件都应该只触发一次: public final class OneShotEvent impl
如何确保 initialize() 方法只被调用一次?下面是我想重构为使用 AtomicBoolean 的线程不安全版本。 我只想要 initialize() 只被调用一次 if (!initiali
我正在寻找一种暂停线程的方法。 我开始使用一个 boolean 标志(称为“暂停”),并用一个 while 循环(暂停)包装一个检查。 在 while 循环中有一个 Thread.wait() 来阻止
我将 lambda 表达式作为参数传递给方法,并且我想修改在 lambda 表达式之外定义的变量。 我已经尝试了几件事。目前,我有一个 AtomicBoolean调用success .在 lambda
我是一名优秀的程序员,十分优秀!