- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列.
但是BlockingQueue的实现类中,有一种阻塞队列比较特殊,就是SynchronousQueue(同步移交队列),队列长度为0.
作用就是一个线程往队列放数据的时候,必须等待另一个线程从队列中取走数据。同样,从队列中取数据的时候,必须等待另一个线程往队列中放数据.
这样特殊的队列,有什么应用场景呢?
先看一个SynchronousQueue的简单用例:
/**
* @author 一灯架构
* @apiNote SynchronousQueue示例
**/
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 1. 创建SynchronousQueue队列
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();
// 2. 启动一个线程,往队列中放3个元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 入队列 1");
synchronousQueue.put(1);
Thread.sleep(1);
System.out.println(Thread.currentThread().getName() + " 入队列 2");
synchronousQueue.put(2);
Thread.sleep(1);
System.out.println(Thread.currentThread().getName() + " 入队列 3");
synchronousQueue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 3. 等待1000毫秒
Thread.sleep(1000L);
// 4. 再启动一个线程,从队列中取出3个元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
Thread.sleep(1);
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
Thread.sleep(1);
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
输出结果:
Thread-0 入队列 1
Thread-1 出队列 1
Thread-0 入队列 2
Thread-1 出队列 2
Thread-0 入队列 3
Thread-1 出队列 3
从输出结果中可以看到,第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2.
由于SynchronousQueue是BlockingQueue的实现类,所以也实现类BlockingQueue中几组抽象方法:
为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法.
操作 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞一段时间 |
---|---|---|---|---|
放数据 | add |
offer |
put |
offer(e, time, unit) |
取数据 | remove |
poll |
take |
poll(time, unit) |
查看数据(不删除) | element() |
peek() |
不支持 | 不支持 |
这几组方法的不同之处就是:
工作中使用最多的就是offer、poll阻塞指定时间的方法.
SynchronousQueue的特点:
队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据.
这种特殊的实现逻辑有什么应用场景呢?
我的理解就是, 如果你希望你的任务需要被快速处理 ,就可以使用这种队列.
Java线程池中的 newCachedThreadPool (带缓存的线程池)底层就是使用SynchronousQueue实现的.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool 线程池的核心线程数是0,最大线程数是Integer的最大值,线程存活时间是60秒.
如果你使用 newCachedThreadPool 线程池,你提交的任务会被更快速的处理,因为你每次提交任务,都会有一个空闲的线程等着处理任务。如果没有空闲的线程,也会立即创建一个线程处理你的任务.
你想想,这处理效率,杠杠滴! 。
当然也有弊端,如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争CPU时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑.
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
// 转换器,取数据和放数据的核心逻辑都在这个类里面
private transient volatile Transferer<E> transferer;
// 默认的构造方法(使用非公平队列)
public SynchronousQueue() {
this(false);
}
// 有参构造方法,可以指定是否使用公平队列
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
// 转换器实现类
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
// 基于栈实现的非公平队列
static final class TransferStack<E> extends Transferer<E> {
}
// 基于队列实现的公平队列
static final class TransferQueue<E> extends Transferer<E> {
}
}
可以看到SynchronousQueue默认的无参构造方法,内部使用的是基于栈实现的非公平队列,当然也可以调用有参构造方法,传参是true,使用基于队列实现的公平队列.
// 使用非公平队列(基于栈实现)
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();
// 使用公平队列(基于队列实现)
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(true);
本次就常用的栈实现来剖析SynchronousQueue的底层实现原理.
栈结构,是非公平的,遵循先进后出.
使用个case测试一下:
/**
* @author 一灯架构
* @apiNote SynchronousQueue示例
**/
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 1. 创建SynchronousQueue队列
SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();
// 2. 启动一个线程,往队列中放1个元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 入队列 0");
synchronousQueue.put(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 3. 等待1000毫秒
Thread.sleep(1000L);
// 4. 启动一个线程,往队列中放1个元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 入队列 1");
synchronousQueue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 5. 等待1000毫秒
Thread.sleep(1000L);
// 6. 再启动一个线程,从队列中取出1个元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 7. 等待1000毫秒
Thread.sleep(1000L);
// 8. 再启动一个线程,从队列中取出1个元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
输出结果:
Thread-0 入队列 0
Thread-1 入队列 1
Thread-2 出队列 1
Thread-3 出队列 0
从输出结果中可以看出,符合栈结构先进后出的顺序.
栈中的数据都是由一个个的节点组成的,先看一下节点类的源码:
// 节点
static final class SNode {
// 节点值(取数据的时候,该字段为null)
Object item;
// 存取数据的线程
volatile Thread waiter;
// 节点模式
int mode;
// 匹配到的节点
volatile SNode match;
// 后继节点
volatile SNode next;
}
item 。
节点值,只在存数据的时候用。取数据的时候,这个值是null.
waiter 。
存取数据的线程,如果没有对应的接收线程,这个线程会被阻塞.
mode 。
节点模式,共有3种类型:
类型值 | 类型描述 | 类型的作用 |
---|---|---|
0 | REQUEST | 表示取数据 |
1 | DATA | 表示存数据 |
2 | FULFILLING | 表示正在等待执行(比如取数据的线程,等待其他线程放数据) |
放数据和取数据的逻辑,在底层复用的是同一个方法,以put/take方法为例,另外两个放数据的方法,add和offer方法底层实现是一样的.
先看一下数据流转的过程,方便理解源码.
还是以上面的case为例:
第一步:Thread0先往SynchronousQueue队列中放入元素0 。
把本次操作组装成SNode压入栈顶,item是元素0,waiter是当前线程Thread0,mode是1表示放入数据.
第二步:Thread1再往SynchronousQueue队列放入元素1 。
把本次操作组装成SNode压入栈顶,item是元素1,waiter是当前线程Thread1,mode是1表示放入数据,next是SNode0.
第三步:Thread2从SynchronousQueue队列中取出一个元素 。
这次的操作比较复杂,也是先把本次的操作包装成SNode压入栈顶.
item是null(取数据的时候,这个字段没有值),waiter是null(当前线程Thread2正在操作,所以不用赋值了),mode是2表示正在操作(即将跟后继节点进行匹配),next是SNode1.
然后,Thread2开始把栈顶的两个节点进行匹配,匹配成功后,就把SNode2赋值给SNode1的match属性,唤醒SNode1中的Thread1线程,然后弹出SNode2节点和SNode1节点.
看完 了put/take流程,再来看源码就简单多了.
先看一下put方法源码:
// 放数据
public void put(E e) throws InterruptedException {
// 不允许放null元素
if (e == null)
throw new NullPointerException();
// 调用转换器实现类,放元素
if (transferer.transfer(e, false, 0) == null) {
// 如果放数据失败,就中断当前线程,并抛出异常
Thread.interrupted();
throw new InterruptedException();
}
}
核心逻辑都在transfer方法中,代码很长,理清逻辑后,也很容易理解.
// 取数据和放数据操作,共用一个方法
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
// e为空,说明是取数据,否则是放数据
int mode = (e == null) ? REQUEST : DATA;
for (; ; ) {
SNode h = head;
// 1. 如果栈顶节点为空,或者栈顶节点类型跟本次操作相同(都是取数据,或者都是放数据)
if (h == null || h.mode == mode) {
// 2. 判断节点是否已经超时
if (timed && nanos <= 0) {
// 3. 如果栈顶节点已经被取消,就删除栈顶节点
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
// 4. 把本次操作包装成SNode,压入栈顶
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 5. 挂起当前线程,等待被唤醒
SNode m = awaitFulfill(s, timed, nanos);
// 6. 如果这个节点已经被取消,就删除这个节点
if (m == s) {
clean(s);
return null;
}
// 7. 把s.next设置成head
if ((h = head) != null && h.next == s)
casHead(h, s.next);
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 8. 如果栈顶节点类型跟本次操作不同,并且不是FULFILLING类型
} else if (!isFulfilling(h.mode)) {
// 9. 再次判断如果栈顶节点已经被取消,就删除栈顶节点
if (h.isCancelled())
casHead(h, h.next);
// 10. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶
else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
// 11. 使用死循环,直到匹配到对应的节点
for (; ; ) {
// 12. 遍历下个节点
SNode m = s.next;
// 13. 如果节点是null,表示遍历到末尾,设置栈顶节点是null,结束。
if (m == null) {
casHead(s, null);
s = null;
break;
}
SNode mn = m.next;
// 14. 如果栈顶的后继节点跟栈顶节点匹配成功,就删除这两个节点,结束。
if (m.tryMatch(s)) {
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 15. 如果没有匹配成功,就删除栈顶的后继节点,继续匹配
s.casNext(m, mn);
}
}
} else {
// 16. 如果栈顶节点类型跟本次操作不同,并且是FULFILLING类型,
// 就再执行一遍上面第11步for循环中的逻辑(很少概率出现)
SNode m = h.next;
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
transfer方法逻辑也很简单,就是判断本次操作类型是否跟栈顶节点相同,如果相同,就把本次操作压入栈顶。否则就跟栈顶节点匹配,唤醒栈顶节点线程,弹出栈顶节点.
transfer方法中调用了awaitFulfill方法, 作用是 挂起当前线程.
// 等待被唤醒
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 1. 计算超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 2. 计算自旋次数
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
// 3. 如果已经匹配到其他节点,直接返回
SNode m = s.match;
if (m != null)
return m;
if (timed) {
// 4. 超时时间递减
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 5. 自旋次数减一
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w;
// 6. 开始挂起当前线程
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
awaitFulfill方法的逻辑也很简单,就是挂起当前线程.
take方法底层使用的也是transfer方法:
// 取数据
public E take() throws InterruptedException {
// // 调用转换器实现类,取数据
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
// 没取到,就中断当前线程
Thread.interrupted();
throw new InterruptedException();
}
我是「一灯架构」,如果本文对你有帮助,欢迎各位小伙伴点赞、评论和关注,感谢各位老铁,我们下期见 。
最后此篇关于Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析的文章就讲到这里了,如果你想了解更多关于Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我想做的是让 JTextPane 在 JPanel 中占用尽可能多的空间。对于我使用的 UpdateInfoPanel: public class UpdateInfoPanel extends JP
我在 JPanel 中有一个 JTextArea,我想将其与 JScrollPane 一起使用。我正在使用 GridBagLayout。当我运行它时,框架似乎为 JScrollPane 腾出了空间,但
我想在 xcode 中实现以下功能。 我有一个 View Controller 。在这个 UIViewController 中,我有一个 UITabBar。它们下面是一个 UIView。将 UITab
有谁知道Firebird 2.5有没有类似于SQL中“STUFF”函数的功能? 我有一个包含父用户记录的表,另一个表包含与父相关的子用户记录。我希望能够提取用户拥有的“ROLES”的逗号分隔字符串,而
我想使用 JSON 作为 mirth channel 的输入和输出,例如详细信息保存在数据库中或创建 HL7 消息。 简而言之,输入为 JSON 解析它并输出为任何格式。 最佳答案 var objec
通常我会使用 R 并执行 merge.by,但这个文件似乎太大了,部门中的任何一台计算机都无法处理它! (任何从事遗传学工作的人的附加信息)本质上,插补似乎删除了 snp ID 的 rs 数字,我只剩
我有一个以前可能被问过的问题,但我很难找到正确的描述。我希望有人能帮助我。 在下面的代码中,我设置了varprice,我想添加javascript变量accu_id以通过rails在我的数据库中查找记
我有一个简单的 SVG 文件,在 Firefox 中可以正常查看 - 它的一些包装文本使用 foreignObject 包含一些 HTML - 文本包装在 div 中:
所以我正在为学校编写一个 Ruby 程序,如果某个值是 1 或 3,则将 bool 值更改为 true,如果是 0 或 2,则更改为 false。由于我有 Java 背景,所以我认为这段代码应该有效:
我做了什么: 我在这些账户之间创建了 VPC 对等连接 互联网网关也连接到每个 VPC 还配置了路由表(以允许来自双方的流量) 情况1: 当这两个 VPC 在同一个账户中时,我成功测试了从另一个 La
我有一个名为 contacts 的表: user_id contact_id 10294 10295 10294 10293 10293 10294 102
我正在使用 Magento 中的新模板。为避免重复代码,我想为每个产品预览使用相同的子模板。 特别是我做了这样一个展示: $products = Mage::getModel('catalog/pro
“for”是否总是检查协议(protocol)中定义的每个函数中第一个参数的类型? 编辑(改写): 当协议(protocol)方法只有一个参数时,根据该单个参数的类型(直接或任意)找到实现。当协议(p
我想从我的 PHP 代码中调用 JavaScript 函数。我通过使用以下方法实现了这一点: echo ' drawChart($id); '; 这工作正常,但我想从我的 PHP 代码中获取数据,我使
这个问题已经有答案了: Event binding on dynamically created elements? (23 个回答) 已关闭 5 年前。 我有一个动态表单,我想在其中附加一些其他 h
我正在尝试找到一种解决方案,以在 componentDidMount 中的映射项上使用 setState。 我正在使用 GraphQL连同 Gatsby返回许多 data 项目,但要求在特定的 pat
我在 ScrollView 中有一个 View 。只要用户按住该 View ,我想每 80 毫秒调用一次方法。这是我已经实现的: final Runnable vibrate = new Runnab
我用 jni 开发了一个 android 应用程序。我在 GetStringUTFChars 的 dvmDecodeIndirectRef 中得到了一个 dvmabort。我只中止了一次。 为什么会这
当我到达我的 Activity 时,我调用 FragmentPagerAdapter 来处理我的不同选项卡。在我的一个选项卡中,我想显示一个 RecyclerView,但他从未出现过,有了断点,我看到
当我按下 Activity 中的按钮时,会弹出一个 DialogFragment。在对话框 fragment 中,有一个看起来像普通 ListView 的 RecyclerView。 我想要的行为是当
我是一名优秀的程序员,十分优秀!