- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
本文中,我们将介绍一个 java.util.concurrent 包提供的用于解决并发生产者 – 消费者问题的最有用的类 – BlockQueue。我们将介绍BlockingQueue 接口的 API 以及如何使用该接口的方法使编写并发程序更容易。
在本文的后面,我们将展示一个具有多个生产者线程和多个消费者线程的简单程序的示例。
java.util.concurrent 提供了两种类型的 BlockingQueue:
1、 无限队列(unboundedqueue)–几乎可以无限增长;
2、 有限队列(boundedqueue)–定义了最大容量;
创建一个无限队列的方法很简单
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
上面这段代码中,blockingQueue 的容量将设置为 Integer.MAX_VALUE 。
向无限队列添加元素的所有操作都将永远不会阻塞,因此它可以增长到非常大的容量。
使用无限 BlockingQueue 设计生产者 – 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息 。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。
第二种类型的队列是有限队列。我们可以通过将容量作为参数传递给构造函数来创建这样的队列
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
上面这句代码中,我们设置了 blockingQueue 的容量为 10 。这意味着当消费者尝试将元素添加到已经满了的队列时,结果取决于添加元素的方法( offer() 、add() 、put() ) ,它将阻塞,直到有足够的空间可以插入元素。否则,添加操作将会失败。
使用有限队列是设计并发程序的好方法,因为当我们将元素插入到已经满了的队列时,这些操作需要等到消费者赶上并在队列中提供一些空间。这种机制可以让那个我们不做任何其它更改就可以实现节流。
BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。
在队列满/空的情况下,来自这两个组的每个方法的行为都不同。
BlockingQueue 提供了以下方法用于添加元素
方法 | 说明 |
---|---|
add() | 如果插入成功则返回 true,否则抛出 IllegalStateException 异常 |
put() | 将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入 |
offer() | 如果插入成功则返回 true,否则返回 false |
offer(E e, long timeout, TimeUnit unit) | 尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入 |
BlockingQueue 提供了以下方法用于检索元素
方法 | 说明 |
---|---|
take() | 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用 |
poll(long timeout, TimeUnit unit) | 检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null |
在构建生产者 – 消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。
接下来我们创建一个由两部分组成的程序 – 生产者 ( Producer ) 和消费者 ( Consumer ) 。
生产者将生成一个 0 到 100 的随机数,并将该数字放在 BlockingQueue 中。我们将创建 4 个线程用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。
需要记住的重要一点是,我们需要阻止我们的消费者线程无限期地等待元素出现在队列中。
从生产者向消费者发出信号的好方法是,不需要处理消息,而是发送称为毒 ( poison ) 丸 ( pill ) 的特殊消息。 我们需要发送尽可能多的毒 ( poison ) 丸 ( pill ) ,因为我们有消费者。然后当消费者从队列中获取特殊的毒 ( poison ) 丸 ( pill )消息时,它将优雅地完成执行。
我们来看以下生产者的代码
public class NumbersProducer implements Runnable {
private BlockingQueue<Integer> numbersQueue;
private final int poisonPill;
private final int poisonPillPerProducer;
public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
}
}
}
我们的生成器构造函数将 BlockingQueue 作为参数,用于协调生产者和使用者之间的处理。我们看到方法 generateNumbers() 将 100 个元素放入队列中。它还需要有毒 ( poison ) 丸 ( pill ) 消息,以便知道在执行完成时放入队列的消息类型。该消息需要将 poisonPillPerProducer 次放入队列中。
每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素。从队列中取出一个 Integer 后,它会检查该消息是否是毒 ( poison ) 丸 ( pill ) ,如果是,则完成一个线程的执行。否则,它将在标准输出上打印出结果以及当前线程的名称。
这将使我们深入了解消费者的内部运作机制
public class NumbersConsumer implements Runnable {
private BlockingQueue<Integer> queue;
private final int poisonPill;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
System.out.println(Thread.currentThread().getName() + " result: " + number);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。
既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 100 个元素。
我们希望有 4 个生产者线程,并且有许多消费者线程将等于可用处理器的数量
int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
for (int i = 1; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者。我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。这里要注意的最重要的事情是 BlockingQueue 用于协调它们之间的工作。
当我们运行程序时,4 个生产者线程将随机整数放入 BlockingQueue 中,消费者将从队列中获取这些元素。每个线程将打印到标准输出线程的名称和结果。
我正在尝试在多线程环境中实现某种累积逻辑;我想知道没有 lock 和 synchronized 关键字是否有更好/更快的方法来做到这一点?以下是我当前的代码: public class Concurr
我需要帮助构建一个实现信号量的监视器,简单的 C 示例就可以。 这是为了证明可以在任何可以使用信号量的地方使用监视器。 最佳答案 如果您说允许使用互斥锁/condvars,请检查: #include
我已经构建了一些返回部分产品目录的 ajax,并且我正在尝试将 xml 输出到文档中,到目前为止,这是我所拥有的: $("#catalog").append("Item NamePriceDe
很抱歉,如果我的问题之前已经被问过,或者它太明显了,但我真的需要澄清这一点。感谢您的帮助。 在多用户界面中,如果来自不同用户的相同事务同时到达服务器,会发生什么? 我有下一张表: create tab
这可能是一个愚蠢的问题,但是这个程序的输出(它的方式)可以为零吗? public class Test2{ int a = 0; AtomicInteger b = new Atomi
假设我本地主机上的一个网站处理每个请求大约需要 3 秒。这很好,正如预期的那样(因为它在幕后进行了一些奇特的网络)。 但是,如果我在选项卡(在 firefox 中)中打开相同的 url,然后同时重新加
我对 MongoDB 的读锁定有点困惑。单个集合可以支持多少个并发读取操作? 最佳答案 如 tk 给出的链接中所写:http://www.mongodb.org/pages/viewpage.acti
如果有四个并发的 CUDA 应用程序在一个 GPU 中竞争资源会发生什么这样他们就可以将工作卸载到图形卡上了? Cuda Programming Guide 3.1 提到那里 某些方法是异步的: 内核
👊上次的百度面试遇到了关于spark的并发数的问题,今天我们就来将这些问题都一并解决一下,图画的的有点丑,还行大家见谅,百度实习的问题我放在了下面的链接👇: 链接: 2022百度大数据开发工程师实
我对 Groovy 线程有疑问。 我的任务是以某种方式翻译给定目录中的每个文件 并将生成的输出放在其他目录中的文件中。 我编写了以下代码,该代码有效: static def translateDir(
Java中的同步和锁定有什么区别? 最佳答案 synchronized是语言关键字;锁是对象。 当一个方法或代码块被标记为同步时,您是说该方法或代码块必须先获得某个锁对象(可以在同步的语法中指定)才能
我需要创建一个能够同时处理来自客户端的多个请求的并发 RPC 服务器。 使用 rpcgen linux编译器(基于sun RPC),不支持-A为并发服务器创建 stub 的选项。 (-A 选项在 so
System.out.println("Enter the number of what you would like to do"); System.out.println("1 = Manuall
我正在将我的应用程序移植到 iOS 8.0 并注意到 UIAlertView 已被弃用。 所以我改变了使用 UIAlertController 的方法。这在大多数情况下都有效。 除了,当我的应用程序打
我正在逐行同时读取两个文本文件。 我特别想做的是当lineCount在每个线程上都是相同的我想看看扫描仪当前正在读取的字符串。 我环顾四周寻找可以实现的某些模式,例如 Compare and Swap
我正在阅读 Java Concurrency in Practice .在章节中断政策部分 取消和关闭 它提到 A task should not assume anything about the
我正在尝试学习线程,互斥等的基础知识。遵循here的文档和示例。在下面的代码中,我得到预期的输出。问题: 想确认我是否有任何陷阱?我们如何改善下面的代码? 我的线程在哪一行尝试获取互斥锁或正在等待互斥
并发是指两个任务在不同的线程上并行运行。但是,异步方法并行运行,但在同一个线程上。这是如何实现的?另外,并行性怎么样? 这三个概念有什么区别? 最佳答案 并发和并行实际上与您正确推测的原理相同,两者都
以此ConcurrentDouble类定义为例: public class ConcurrentDouble { public double num = 0; public void subt
在得知并发确实增加了许多人的吞吐量后,我一直计划在项目中使用并发。现在我在多线程或并发方面还没有做太多工作,因此决定在实际项目中使用它之前学习并进行简单的概念验证。 以下是我尝试过的两个示例: 1.
我是一名优秀的程序员,十分优秀!