- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Java 并发编程ArrayBlockingQueue的实现由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
ArrayBlockingQueue 顾名思义:基于数组的阻塞队列。数组是要指定长度的,所以使用 ArrayBlockingQueue 时必须指定长度,也就是它是一个有界队列。它实现了 BlockingQueue 接口,有着队列、集合以及阻塞队列的所有方法.
ArrayBlockingQueue 是线程安全的,内部使用 ReentrantLock 来保证。ArrayBlockingQueue 支持对生产者线程和消费者线程进行公平的调度。当然默认情况下是不保证公平性的,因为公平性通常会降低吞吐量,但是可以减少可变性和避免线程饥饿问题.
通常,队列的实现方式有数组和链表两种方式。对于数组这种实现方式来说,我们可以通过维护一个队尾指针,使得在入队的时候可以在 O(1)O(1) 的时间内完成;但是对于出队操作,在删除队头元素之后,必须将数组中的所有元素都往前移动一个位置,这个操作的复杂度达到了 O(n)O(n),效果并不是很好。如下图所示:
为了解决这个问题,我们可以使用另外一种逻辑结构来处理数组中各个位置之间的关系。假设现在我们有一个数组 A[1…n],我们可以把它想象成一个环型结构,即 A[n] 之后是 A[1],相信了解过一致性 Hash 算法的童鞋应该很容易能够理解.
如下图所示:我们可以使用两个指针,分别维护队头和队尾两个位置,使入队和出队操作都可以在 O(1O(1 )的时间内完成。当然,这个环形结构只是逻辑上的结构,实际的物理结构还是一个普通的数组.
讲完 ArrayBlockingQueue 的数据结构,接下来我们从源码层面看看它是如何实现阻塞的.
items 是一个数组,用来存放入队的数据;count 表示队列中元素的个数;takeIndex 和 putIndex 分别代表队头和队尾指针.
第一个构造函数只需要指定队列大小,默认为非公平锁;第二个构造函数可以手动指定公平性和队列大小;第三个构造函数里面使用了 ReentrantLock 来加锁,然后把传入的集合元素按顺序一个个放入 items 中。这里加锁目的不是使用它的互斥性,而是让 items 中的元素对其他线程可见(参考 AQS 里的 state 的 volatile 可见性).
3.3.1 入队 。
ArrayBlockingQueue 提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:
(1)add(E e) 。
可以看到 add 方法调用的是父类,也就是 AbstractQueue 的 add 方法,它实际上调用的就是 offer 方法.
(2)offer(E e) 。
我们接着上面的 add 方法来看 offer 方法:
offer 方法在队列满了的时候返回 false,否则调用 enqueue 方法插入元素,并返回 true.
enqueue 方法首先把元素放在 items 的 putIndex 位置,接着判断在 putIndex+1 等于队列的长度时把 putIndex 设置为0,也就是上面提到的圆环的 index 操作。最后唤醒等待获取元素的线程.
(3)offer(E e, long timeout, TimeUnit unit) 。
该方法在 offer(E e) 的基础上增加了超时的概念.
利用了 Condition 的 awaitNanos 方法,等待指定时间,因为该方法可中断,所以这里利用 while 循环来处理中断后还有剩余时间的问题,等待时间到了以后调用 enqueue 方法放入队列.
(4)put(E e) 。
put 方法在 count 等于 items 长度时,一直等待,直到被其他线程唤醒。唤醒后调用 enqueue 方法放入队列.
3.3.2 出队 。
入队列的方法说完后,我们来说说出队列的方法。ArrayBlockingQueue 提供了多种出队操作的实现来满足不同情况下的需求,如下:
(1)poll() 。
poll 方法是非阻塞方法,如果队列没有元素返回 null,否则调用 dequeue 把队首的元素出队列.
dequeue 会根据 takeIndex 获取到该位置的元素,并把该位置置为 null,接着利用圆环原理,在 takeIndex 到达列表长度时设置为0,最后唤醒等待元素放入队列的线程.
(2)poll(long timeout, TimeUnit unit) 。
该方法是 poll() 的可配置超时等待方法,和上面的 offer 一样,使用 while 循环配合 Condition 的 awaitNanos 来进行等待,等待时间到后执行 dequeue 获取元素.
(3)take() 。
取走队列里排在首位的对象,不同于 poll() 方法,若BlockingQueue为空,就阻塞等待直到有新的数据被加入。 (4)drainTo() 。
drainTo 相比于其他获取方法,能够一次性从队列中获取所有可用的数据对象(还可以指定获取数据的个数)。通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁.
3.3.3 获取元素 。
这里获取元素时上锁是为了避免脏数据的产生.
3.3.4 删除元素 。
我们可以想象一下,队列中删除某一个元素时,是不是要遍历整个数据找到该元素,并把该元素后的所有元素往前移一位,也就是说,该方法的时间复杂度为 O(n)O(n).
remove 方法比较简单,它从 takeIndex 一直遍历到 putIndex,直到找到和元素 o 相同的元素,调用 removeAt 进行删除。我们重点来看一下 removeAt 方法.
removeAt 的处理方式和我想的稍微有一点出入,它内部分为两种情况来考虑:
也就是我考虑的时候没有考虑边界问题。当 removeIndex == takeIndex 时就不需要后面的元素整体往前移了,而只需要把 takeIndex的指向下一个元素即可(类比圆环);当 removeIndex != takeIndex 时,通过 putIndex 将 removeIndex 后的元素往前移一位.
ArrayBlockingQueue 是一个阻塞队列,内部由 ReentrantLock 来实现线程安全,由 Condition 的 await 和 signal 来实现等待唤醒的功能。它的数据结构是数组,准确的说是一个循环数组(可以类比一个圆环),所有的下标在到达最大长度时自动从 0 继续开始.
到此这篇关于Java 并发编程ArrayBlockingQueue的实现的文章就介绍到这了,更多相关Java 并发编程ArrayBlockingQueue内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://juejin.cn/post/6930401738021961742 。
最后此篇关于Java 并发编程ArrayBlockingQueue的实现的文章就讲到这里了,如果你想了解更多关于Java 并发编程ArrayBlockingQueue的实现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
当我阅读ArrayBlockingQueue.take方法的源代码时,我遇到了一个问题。 我认为两个线程同时调用 take 方法,只有一个线程可以成功获取锁,而另一个线程将在以下行等待锁:lock.l
我正在寻找与 ArrayBlockingQueue 类似的库。就是这样,我不需要它提供的线程安全功能(为了更好的性能目的),因为它在 offer(E e) 方法中使用了 ReentrantLock 。
我编写了解决有界生产者和消费者问题的程序。在构造 ArrayBlockingQueue 时,我定义了容量 100。我正在使用方法 take 和 put inside threads。而且我注意到有时我
我刚刚在研究 JDK 1.6 时发现ArrayBlockingQueue - 构造函数调用了公共(public)可重写方法之一!我认为这对于 API 来说是一种不好的做法。 public Array
ArrayBlockingQueue 中没有一个操作与它的任何其他操作并发;他们总是拿同一把锁。即使对于 size() 方法,它也需要一个锁。 public int size() {
我正在研究 BlockingQueue 接口(interface),其中 ArrayBlockingQueue 是一个实现。出于演示目的,我开发了以下代码: import java.util.conc
只是为了学习,我编写了以下用于自定义线程池的代码,引用并编辑显示的代码 here. 如代码所示,我使用 ArrayBlockingQueue 作为任务队列。 代码: import java.util.
场景:在我的消费者有机会消费之前,我的生产者填满了数组,比如 capacity new int[10]。我的生产者看到数组已满并阻塞。 然后我的消费者出现并删除了 int[0],并向生产者发出信号,该
关键字synchronize 没有出现在ArrayBlockingQueue 的源代码中。这是否意味着我可以出于“我自己的目的”自由使用它的内在锁?或者这会在未来发生变化吗? 最佳答案 一般来说,我会
我有两个线程,一个分派(dispatch)消息,另一个解析消息。简单,常见。我使用 ArrayBlockingQueue 进行同步,但不希望调度程序直接访问工作消息队列 - 我使用包装器。问题是是否应
您好,我很好奇是否有办法检查 ArrayBlockingQuery 查询当前是否被锁定?原因:我有一个服务器,它监听套接字,接收参数,处理它们,然后将一些结果返回给客户端。该服务器(假设是服务器 A)
我知道下面代码中进行的递增不是原子的。我希望增量、插入阻塞队列和打印计数器的值一起成为一个原子操作。我知道原子 int 但我正在尝试使用同步来使其工作以用于学习目的。 int counter = 0;
对于 Java 中的 ArrayBlockingQueue,queue.add(element) 是否会锁定它所在的线程?我有一个运行着数十个线程的应用程序,它们会将所有信息放入一个 ArrayBlo
我有一个 ArrayBlockingQueue,它有多个与数据库的连接。许多线程尝试通过轮询来获取连接。队列中可用的最大连接数为50,超过50后,线程必须等待连接放回才能获取数据库连接。 问题是我无法
我发现自己在重复这种模式,并且常常想知道这在 Java 中是否是惯用的,或者是否有更好的方法来实现这种行为。 问题:给定生产者/消费者设置,消费者想要处理批量的项目,因此它使用 drainTo(),但
我有一个简单的 ArrayBlockingQueue 测试如下: public class TestQueue { static class Producer implements Runna
ArrayBlockingQueue 包含一个作为数组的缓冲区。它还支持公认的低效 public boolean remove(Object o) Removal of interior elemen
我正在尝试编写一个像ArrayBlockingQueue这样的简单队列,其中如果在添加元素时队列已满,则队列的头部将被删除。该类应该只具有以下公共(public)方法 获取队列的大小 从队列头部获取一
这是我第一次在 StackOverflow 上提问。我遇到的问题如下: 我有一个生产者和消费者类。在 Producer 类中,我逐行读取文件并将这些文本行放入字符串列表中。当列表有 x 行时。该列表被
我正在尝试调整执行以下操作的线程: 只有 1 个线程的线程池 [CorePoolSize =0, maxPoolSize = 1] 使用的队列是 ArrayBlockingQueue 问题 = 20
我是一名优秀的程序员,十分优秀!