- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
首先,为了防止那些不喜欢阅读到最后的人将问题标记为重复,我已经阅读了 Producer-Consumer Logging service with Unreliable way to shutdown问题。但这并没有完全回答问题,答案与书上的文字相矛盾。
书中提供了如下代码:
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private static final int CAPACITY = 1000;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
private class LoggerThread extends Thread {
private final PrintWriter writer;
public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true); // autoflush
}
public void run() {
try {
while (true)
writer.println(queue.take());
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}
现在我们应该了解如何停止这个过程。我们应该停止记录但不应该跳过已经提交的消息。
作者研究方法:
public void log(String msg) throws InterruptedException {
if(!shutdownRequested)
queue.put(msg);
else
throw new IllegalArgumentException("logger is shut down");
}
然后这样评论:
Another approach to shutting down LogWriter would be to set a “shutdown requested” flag to prevent further messages from being submitted, as shown in Listing 7.14. The con- sumer could then drain the queue upon being notified that shutdown has been requested, writing out any pending messages and unblocking any producers blocked in log . However, this approach has race conditions that make it unreliable. The implementation of log is a check-then-act sequence: producers could observe that the service has not yet been shut down but still queue messages after the shutdown, again with the risk that the producer might get blocked in log and never become unblocked. There are tricks that reduce the likelihood of this (like having the consumer wait several seconds before declaring the queue drained), but these do not change the fundamental problem, merely the likelihood that it will cause a fail- ure.
这句话对我来说已经够难了。
我明白了
if(!shutdownRequested)
queue.put(msg);
不是原子的,消息可以在关闭后添加到队列中。是的,它不是很准确,但我看不出问题。队列将被排空,当队列为空时我们可以停止 LoggerThread。特别是我不明白为什么生产者会被屏蔽。
作者没有提供完整的代码,因此我无法理解所有细节。我相信这本书被社区大多数人读过,这个例子有详细的解释。
请用完整的代码示例进行解释。
最佳答案
首先要了解的是,当请求关闭时,生产者需要停止接受任何更多请求,而消费者(在本例中为 LoggerThread
)需要清空队列。您在问题中提供的代码仅展示了故事的一方面;当 shutdownRequested
时,生产者拒绝任何进一步的请求是true
.在这个例子之后,作者继续说:
The consumer could then drain the queue upon being notified that shutdown has been requested, writing out any pending messages and unblocking any producers blocked in log
首先,queue.take
在 LoggerThread
如您的问题所示,将无限阻止队列中可用的新消息;但是,如果我们想关闭 LoggerThread
(优雅地),我们需要确保 LoggerThread
中的关机代码shutdownRequested
时有机会执行是真实的而不是无限地被 queue.take
阻止.
当作者说消费者可以耗尽队列时,他的意思是LogWritter
可以查看shutdownRequested
如果为真,它可以调用非阻塞 drainTo方法在单独的集合中排出队列的当前内容,而不是调用 queue.take
(或改为调用类似的非阻塞方法)。或者,如果 shutdownRequested
是假的,LogWriter
可以继续调用queue.take
照常。
这种方法的真正问题是 log
的方式方法(由生产者调用)被实现。由于它不是原子的,因此多个线程可能会错过 shutdownRequested
的设置。为真。如果错过此更新的线程数大于 CAPACITY
会发生什么的 queue
.我们来看看log
再次方法。 (为了便于解释,添加了大括号):
public void log(String msg) throws InterruptedException {
if(!shutdownRequested) {//A. 1001 threads see shutdownRequested as false and pass the if condition.
//B. At this point, shutdownRequested is set to true by client code
//C. Meanwhile, the LoggerThread which is the consumer sees that shutdownRequested is true and calls
//queue.drainTo to drain all existing messages in the queue instead of `queue.take`.
//D. Producers insert the new message into the queue.
queue.put(msg);//Step E
} else
throw new IllegalArgumentException("logger is shut down");
}
}
如步骤E所示,多个生产者线程可以调用put
。而 LoggerThread
完成排空队列并退出 w。在第 1000 线程调用 put
之前应该没有问题.真正的问题是当第 1001 线程调用 put
时.它会阻塞,因为队列容量只有 1000 并且 LoggerThread
可能不再存在或订阅了 queue.take
方法。
关于java - 为什么 LogWriter 中的竞争条件会导致生产者阻塞? [实践中的并发],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42363351/
kafka的java客户端-生产者 生产者消息发送流程 发送原理 在消息发送的过程中,涉及俩个线程,main线程和sender线程,在main线程中创建一个双端队列RecordAccumulator。
我使用互斥体和条件编写了一个生产者/消费者程序。它使用全局 int 来生成和使用值。有 1 个消费者线程和多个生产者线程。 规则: 当值太小时,消费者会等待。 当值太大时,生产者就会等待。 我的问题是
我有兴趣发现当有多个产品和多个消费者时是否可以在不使用赋值的情况下解决生产者 - 消费者问题,即使用函数式编程风格?如何? Producer-consumer problem 谢谢 最佳答案 是的,您
单个进程中的两个不同线程可以通过读取和/或写入共享一个公共(public)内存位置。 通常,这种(有意的)共享是通过使用 lock 的原子操作来实现的。 x86 上的前缀,对于 lock前缀本身(即非
我正在尝试编写一个简单的生产者-消费者应用程序,在该应用程序中,我需要从文件中读取大块数据(可能很大),并且(出于简单测试目的)只需通过另一个线程将其写入另一个文件中即可。 我尝试了很多在线资源,但是
我已经为kafka(wurstmeister / kafka-docker)构建了一个docker镜像。在docker容器内部,我能够使用内置的shell脚本创建主题,生成消息并使用消息。现在,我正在
我正在尝试模拟关于多线程的生产者-消费者模型。 我们假设要遵守三个规则: 当桶装满产品时,生产者不能将产品添加到桶中。 当桶为空时,消费者无法从桶中获取产品。 生产和消费不能同时进行。换句话说,这两个
我有一个生成器应用程序,可以生成索引(将其存储在某些内存树数据结构中)。消费者应用程序将使用索引来搜索部分匹配。 我不希望消费者 UI 在生产者索引数据时必须阻塞(例如通过某些进度条)。基本上,如果用
我正在尝试为我遇到的排队问题找到解决方案。在典型的场景中,生产者将一些东西放入队列中,而消费者将其取出。如果我们有一个也消费的生产者和一个最初从队列中取出某些内容然后将某些内容(例如结果)放回到队列中
虽然以下是众所周知的话题,但我想请您提供意见。我写了一个小程序如下:所有生产者和消费者都排队。我不明白为什么会这样。什么场景下可以完全阻塞。 让我们考虑一下生产者/消费者正在等待数组上的锁,以及是什么
下面是我用于实现生产者-消费者问题的代码。使用 notifyAll() 一切正常,但是由于性能原因,我想用 notify() 替换所有出现的 notifyAll() >. 我发现通过将 notifyA
我有一个生产者-消费者的基本实现,如下所示: 我的问题是如何使线程数:x ~ y 来提高应用程序性能和负载平衡?有人有关键字或提示吗?预先感谢您! 最佳答案 您应该能够通过 Little's La
我编写了一个类“Producer”,它连续解析特定文件夹中的文件。解析的结果将存储在Consumer的队列中。 public class Producer extends Thread { p
我遇到“生产者 - 消费者任务”中可能出现死锁的问题。一切都应该按以下方式进行: 生产者应该生成 int[] 数组并将其添加到集合中 消费者应该获取这些数组,将它们放入第二个集合并在输出中打印 在 D
我正在为我的操作系统类(class)做一个 CPU 调度模拟器项目。该程序应包含两个线程:生产者线程和消费者线程。生产者线程包括在系统中生成进程的生成器和选择多个进程并将它们放入一个名为 Buffer
我想知道是否可以通过 AMQP 和 RabbitMQ 为生产者和消费者使用不同的语言? 例如:Java 用于生产者,python/php 用于消费者,还是反之? 最佳答案 是的,AMQP 与语言无关,
编辑:我有一个生产者类,它将一些数据发送到 SharedBuffer 类。该数据被添加到 ArrayList 中,限制设置为 100。将数据添加到所述列表中没有问题,但消费者类无法从列表中获取任何数据
我正在尝试在有界缓冲区中使用生产者/消费者线程。缓冲区长度为 5。我有 1 个互斥体和 2 个信号量,空信号量从缓冲区大小开始,满信号量从 0 开始。 当我在最后没有 sleep() 的情况下运行代码
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 9 年前。 Improve this ques
我用Java的LinkedBlockingDeque实现了生产者-消费者模式,但我遇到了一个问题,我有时想将一个项目(已经在队列中的某个位置)移动到队列的前面,以便更快地处理它。我永远不知道哪些已经排
我是一名优秀的程序员,十分优秀!