- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
欢迎关注: 王有志 期待你加入Java人的提桶跑路群: 共同富裕的Java人 。
今天来和大家聊聊 Condition , Condition 为AQS“家族” 提供了等待与唤醒的能力 ,使AQS"家族"具备了像 synchronized 一样暂停与唤醒线程的能力。我们先来看两道关于 Condition 的面试题目:
Condition
和 Object
的等待与唤醒有什么区别? Condition
队列? 接下来,我们就按照“是什么”,“怎么用”和“如何实现”的顺序来揭开 Condition 的面纱吧.
Condition 是Java中的接口,提供了与 Object#wait 和 Object#notify 相同的功能。Doug Lea在 Condition 接口的描述中提到了这点:
Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true. 。
来看 Condition 接口中提供了哪些方法:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
Condition 只提供了两个功能: 等待(await)和唤醒(signal) ,与 Object 提供的等待与唤醒时相似的:
public final void wait() throws InterruptedException;
public final void wait(long timeoutMillis, int nanos) throws InterruptedException;
public final native void wait(long timeoutMillis) throws InterruptedException;
@HotSpotIntrinsicCandidate
public final native void notify();
@HotSpotIntrinsicCandidate
public final native void notifyAll();
唤醒功能上, Condition 与 Object 的差异并不大:
Condition#signal \(\approx\) Object#notify 。
Condition#signalAll \(=\) Object#notifyAll 。
多个线程处于等待状态时, Object#notify() 是“随机”唤醒线程,而 Condition#signal 则由具体实现决定如何唤醒线程,如: ConditionObject 唤醒的是最早进入等待的线程 , 但两个方法均只唤醒一个线程.
等待功能上, Condition 与 Object 的共同点是: 都会释放持有的资源 , Condition 释放锁 , Object 释放Monitor,即进入等待状态后允许其他线程获取锁/监视器 。主要的差异体现在 Condition 支持了更加丰富的场景,通过一张表格来对比下:
Condition 方法 |
Object 方法 |
解释 |
---|---|---|
Condition#await() |
Object#wait() |
暂停线程,抛出线程中断异常 |
Condition#awaitUninterruptibly() |
/ | 暂停线程,不抛出线程中断异常 |
Condition#await(time, unit) |
Object#wait(timeoutMillis, nanos) |
暂停线程,直到被唤醒或等待指定时间后,超时后自动唤醒返回false,否则返回true |
Condition#awaitUntil(deadline) |
/ | 暂停线程,直到被唤醒或到达指定时间点,超时后自动唤醒返回false,否则返回true |
Condition#awaitNanos(nanosTimeout) |
/ | 暂停线程,直到被唤醒或等待指定时间后,返回值表示被唤醒时的剩余时间(nanosTimeout-耗时),结果为负数表示超时 |
除了以上差异外, Condition 还支持创建多个等待队列,即同一把锁拥有多个等待队列,线程在不同队列中等待,而 Object 只有一个等待队列 。《Java并发编程的艺术》中也有一张类似的表格,放在这里供大家参考:
Tips :
Condition
的实现部分,下文通过AQS中的 ConditionObject
详细解释。 既然 Condition 与 Object 提供的等待与唤醒功能相同,那么它们的用法是不是也很相似呢?
与调用 Object#wait 和 Object#notifyAll 必须处于 synchronized 修饰的代码中一样(获取Monitor), 调用 Condition#await 和 Condition#signalAll 的前提是要先获取锁 。但不同的是,使用 Condition 前,需要先通过锁去创建 Condition .
以 ReentrantLock 中提供的 Condition 为例,首先是创建 Condition 对象:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
然后是获取锁并调用 await 方法:
new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.unlock();
}
最后,通过调用 singalAll 唤醒全部阻塞中的线程:
new Thread(() -> {
lock.lock();
condition.signalAll();
lock.unlock();
}
作为接口 Condition 非常惨,因为在Java中只有AQS中的内部类 ConditionObject 实现了 Condition 接口:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
}
static final class Node {
// 省略
}
}
ConditionObject 只有两个 Node 类型的字段,分别是链式结构中的头尾节点, ConditionObject 就是通过它们实现的等待队列。那么 ConditionObject 的等待队列起到了怎样的作用呢?是类似于AQS中的排队机制吗?带着这两个问题,我们正是开始源码的分析.
Condition 接口中定义了4个线程等待的方法:
void await() throws InterruptedException
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
方法虽然很多,但它们之间的差异较小,只体现在时间的处理上,我们看其中最常用的方法:
public final void await() throws InterruptedException {
// 线程中断,抛出异常
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 注释1:加入到Condition的等待队列中
Node node = addConditionWaiter();
// 注释2:释放持有锁(调用AQS的release)
int savedState = fullyRelease(node);
int interruptMode = 0;
// 注释3:判断是否在AQS的等待队列中
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 中断时退出方法
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
// 加入到AQS的等待队列中,调用AQS的acquireQueued方法
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
// 断开与Condition队列的联系
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
}
注释1的部分,调用 addConditionWaiter 方法添加到 Condition 队列中:
private Node addConditionWaiter() {
// 判断当前线程是否为持有锁的线程
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 获取Condition队列的尾节点
Node t = lastWaiter;
// 断开不再位于Condition队列的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建Node.CONDITION模式的Node节点
Node node = new Node(Node.CONDITION);
if (t == null) {
// 队列为空的场景,将node设置为头节点
firstWaiter = node;
} else {
// 队列不为空的场景,将node添加到尾节点的后继节点上
t.nextWaiter = node;
}
// 更新尾节点
lastWaiter = node;
return node;
}
可以看到, Condition 的队列是一个朴实无华的双向链表,每次调用 addConditionWaiter 方法,都会加入到 Condition 队列的尾部.
注释2的部分, 释放线程持有的锁,同时移出AQS的队列 ,内部调用了AQS的 release 方法:
=final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState)) {
return savedState;
}
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
因为已经分析过AQS的 release 方法和 ReentrantLock 实现的 tryRelease 方法,这里我们就不过多赘述了.
注释3的部分, isOnSyncQueue 判断当前线程是否在AQS的等待队列中,我们来看此时存在的情况:
isOnSyncQueue
返回 false
,即 线程不在AQS的队列中 ,进入自旋,调用 LockSupport#park
暂停线程; isOnSyncQueue
返回 true
,即 线程在AQS的队列中 ,不进入自旋,执行后续逻辑。 结合注释1和注释2的部分, Condition#await 的实现原理了就很清晰了:
Condition
与AQS分别维护了一个等待队列,而且是互斥的,即 同一个节点只会出现在一个队列中 ; Condition#await
时,将线程添加到 Condition
的队列中(注释1),同时从AQS队列中移出(注释2); Condition
队列中,该线程需要被暂停,调用 LockSupport#park
; 基于以上的结论,我们已经能够猜到唤醒方法 Condition#signalAll 的原理了:
Condition
队列中移出,并添加到AQS的队列中; LockSupport.unpark
唤醒线程。 至于这个猜想是否正确,我们接着来看唤醒方法的实现.
Tips :如果忘记了AQS中相关方法是如何实现的,可以回顾下《 AQS的今生,构建出JUC的基础 》.
来看 signal 和 signalAll 的源码:
// 唤醒一个处于等待中的线程
public final void signal() {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 获取Condition队列中的第一个节点
Node first = firstWaiter;
if (first != null) {
// 唤醒第一个节点
doSignal(first);
}
}
// 唤醒全部处于等待中的线程
public final void signalAll() {
if (!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if (first != null) {
// 唤醒所有节点
doSignalAll(first);
}
}
两个方法唯一的差别在于头节点不为空的场景下,是调用 doSignal 唤醒一个线程还是调用 doSignalAll 唤醒所有线程:
private void doSignal(Node first) {
do {
// 更新头节点
if ( (firstWaiter = first.nextWaiter) == null) {
// 无后继节点的场景
lastWaiter = null;
}
// 断开节点的连接
first.nextWaiter = null;
// 唤醒头节点
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
// 将Condition的队列置为空
lastWaiter = firstWaiter = null;
do {
// 断开链接
Node next = first.nextWaiter;
first.nextWaiter = null;
// 唤醒当前头节点
transferForSignal(first);
// 更新头节点
first = next;
} while (first != null);
}
可以看到,无论是 doSignal 还是 doSignalAll 都只是将节点移出 Condition 队列,而真正起到唤醒作用的是 transferForSignal 方法,从方法名可以看到该方法是通过“ 转移 ”进行唤醒的,我们来看源码:
final boolean transferForSignal(Node node) {
// 通过CAS替换node的状态
// 如果替换失败,说明node不处于Node.CONDITION状态,不需要唤醒
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
return false;
}
// 将节点添加到AQS的队列的队尾
// 并返回老队尾节点,即node的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// 对前驱节点状态的判断
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) {
LockSupport.unpark(node.thread);
}
return true;
}
transferForSignal 方法中,调用 enq 方法将 node 重新添加到AQS的队列中,并返回 node 的前驱节点,随后对前驱节点的状态进行判断:
Node.CANCELLED
状态,前驱节点退出锁的争抢, node
可以直接被唤醒; Node.SIGNAL
,设置失败时,直接唤醒 node
。 《 AQS的今生,构建出JUC的基础 》中介绍了 waitStatus 的5种状态,其中 Node.SIGNAL 状态表示需要唤醒后继节点。另外,在分析 shouldParkAfterFailedAcquire 方法的源码时,我们知道在进入AQS的等待队列时,需要将前驱节点的状态更新为 Node.SIGNAL .
最后来看 enq 的实现:
private Node enq(Node node) {
for (;;) {
// 获取尾节点
Node oldTail = tail;
if (oldTail != null) {
// 更新当前节点的前驱节点
node.setPrevRelaxed(oldTail);
// 更新尾节点
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
// 返回当前节点的前驱节点(即老尾节点)
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
enq 的实现就非常简单了,通过CAS更新AQS的队列尾节点,相当于添加到AQS的队列中,并返回尾节点的前驱节点。好了,唤醒方法的源码到这里就结束了,是不是和我们当初的猜想一模一样呢?
功能上, Condition 实现了AQS版 Object#wait 和 Object#notify ,用法上也与之相似,需要先获取锁,即需要在 lock 与 unlock 之间调用。原理上, 简单来说就是线程在AQS的队列和 Condition 的队列之间的转移 .
假设有线程t已经获取了 ReentrantLock ,线程t1,t2和t3正在AQS的队列中等待,我们可以得到这样的结构:
如果线程t中调用了 Condition#await 方法,线程t进入 Condition 的等待队列中,线程t1获取 ReentrantLock ,并从AQS的队列中移出,结构如下:
如果线程t1中也执行了 Condition#await 方法,同样线程t1进入 Condition 队列中,线程t2获取到 ReentrantLock ,结构如下:
如果线程t2执行了 Condition#signal ,唤醒 Condition 队列中的第一个线程,此时结构如下:
通过上面的流程,我们就可以得到线程是如何在 Condition 队列与AQS队列中转移的:
关于 Condition 的内容到这里就结束了,无论是理解,使用还是剖析原理, Condition 的难度并不高,只不过大家可能平时用得比较少,因此多少有些陌生.
最后,截止到文章发布,我应该是把开头两道题目的 题解 写完了吧~~ 。
好了,今天就到这里了,Bye~~ 。
最后此篇关于17.AQS中的Condition是什么?的文章就讲到这里了,如果你想了解更多关于17.AQS中的Condition是什么?的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我有一个使用 Oracle 高级队列的 JMS op top 的应用程序。我想对显示消息内容的队列表进行查询(在我的情况下是 XML)。因此,当我执行“从 [queue_table] 中选择 user
使用较新版本的 Oracle DB (12.2.0.1),我们开始收到 ORA-00932,因为代码可以很好地与旧的 DB 服务器配合使用。此外,驱动程序报告异常的奇怪细节: ORA-00932: i
源链接:https://www.axa6.com/zh/an-excellent-virtual-machine-memory-architecture 简介 虚拟机内存架构直接影响虚拟机的性能和
我们正在升级我们的基础设施,为此我们正在从 Oracle 10g 迁移到 11g。 我们使用 Oracle GoldenGate 进行数据复制,据我们所知,它不支持 AQ 消息的复制。 对于持久队列,
我在oracle中创建了一个AQ,并用Java编写了2个JMS消费者来监听队列。我有时观察到,如果我在队列中生成一些消息;队列中出队消息的数量大于入队数量。这意味着某些消息会被消费两次。 我创建了具有
我有多个运行的docker(版本18.09.0,构建4d60db4)容器,我希望立即停止它们。 This blog post准确地显示了如何实现此目标,太好了! 我可以使用docker ps -aq列
聊聊JUC包下的底层支撑类-AbstractQueuedSynchronizer(AQS) juc包下的一堆并发工具类是我们日常开发特别是面试中常被拿来问的八股文之一,为了工作也好,为了面试也
这个问题在这里已经有了答案: Docker unknown shorthand flag: 'a' in -aq) (2 个回答) 1年前关闭。 我按照教程 https://phoenixnap.co
我根本不使用高级队列,但 AQ$_PLSQL_NTFNnnnn 调度程序作业的数量不断增长。 目前有 8 个此类职位。正因为如此,我需要刷新最大同时运行作业数。 大约 2 个月前,限制为 10 个还可
我们有一个 Oracle AQ 队列引发事件。 我们有 Java Oracle AQ 客户端处理这些事件。 出于灾难恢复的目的,我们有另一个始终关闭的客户端。但我们也遇到过灾难恢复练习让第二个客户端处
Oracle 10g 中的触发器为常规表中的行子集生成更新插入和删除消息。这些消息由两个字段组成: 唯一的行 ID。 非唯一 ID。 当使用这些消息时,我想对遵守以下约束的双端队列进程施加一个顺序:
我正在使用带有高级队列 (AQ) 和 Java JMS API 的 Oracle 数据库 [11.2]。我当前的 oracle 设置是默认设置,没有额外的调整参数。高层架构: 数据库将消息排入持久队列
1、面试官:如何设计一个秒杀系统?请你阐述流程? 这一面试题答案参考自三太子敖丙的文章:阿里面试官问我:如何设计秒杀系统?我给出接近满分的回答 秒杀系统要解决的几个问题? ① 高并发
我们有一个基于 Oracle AQ 的消息传递系统 - 它运行良好,入队和出队没有任何问题。 现在我们收到了在启动前和运行时添加一些完整性检查的请求,例如“检查所提供的 db-user 的队列是否确实
AQ$_MESSAGES_EXCEPTION首先我知道有这个问题:How to clear a queue in Oracle AQ但它没有答案。 我在 Oracle AQ 的异常队列中有很多消息 (
过去几天我一直在互联网上搜索用于消息入队/双端队列的 Oracle Advanced Queue 的任何正在运行的示例实现,但没有取得任何成功。 我试图遵循 oracle 文档中提到的规范,但我对此不
AQ$_MESSAGES_EXCEPTION首先我知道有这个问题:How to clear a queue in Oracle AQ但它没有答案。 我在 Oracle AQ 的异常队列中有很多消息 (
我是第一次测试 Oracle AQ。我已成功在我创建的队列中创建了 2000 行测试插入。 现在,我想清除这些内容。当我自学时,我将到期时间设置为一个月。我等不了那么久。而且我认为我不应该将它们从队列
我已经设置了单个客户 Oracle AQ。我在 Java Web 应用程序中使用 CLIENT_ACKNOWLEDGE 模式观察来自此队列的消息。但是,一旦我在 onMessage 方法中收到消息,这
我正在编写一个工作流系统,该系统的每一步都完全由明确的人机交互驱动。也就是说,一项任务被分配给一个人,该人从几个有限的选项中进行选择{批准、拒绝、转发},然后将其发送给下一个人或终止。 只是好奇 Or
我是一名优秀的程序员,十分优秀!