gpt4 book ai didi

AQS实现原理

转载 作者:我是一只小鸟 更新时间:2023-07-13 22:31:49 25 4
gpt4 key购买 nike

在java.util.concurrent包中,我们经常会使用ReentrantLock,CyclicBarrier等工具类,但是我们往往对其内部的实现原理却并不知晓.

本篇文章主要对上述工具类的核心实现AQS进行剖析,分析原理可以让我们学习到大神的代码设计思维.

文章将从一下几个方面分析:

1.AQS是什么?

AbstractQueuedSynchronizer类就是我们通常说的AQS,抽象的队列同步器.

其实我们学过操作系统原理都知道,所谓的同步,指的是多线程场景下通过某种机制,保证某段代码执行是线程独享的, 。

我们把这段代码叫同步块,而把这种机制叫同步.

在JAVA中,传统的方式是使用synchronized关键字来实现同步,那么其底层是基于C++实现的ObjectMonitor.

今天我们讨论的AQS是JDK1.5之后,提供的一个能实现同步功能的抽象类.

通过该类的注释,我们可以了解到其内部采用了FIFO队列的数据结构来实现,互斥场景的资源申请和释放的实现如下所示:

                          
                               Acquire:
       
                          
                          
                            while
                          
                           (!
                          
                            tryAcquire(arg)) {
          enqueue thread 
                          
                          
                            if
                          
                          
                             it is not already queued;
          possibly block current thread;
       }
  
   Release:
       
                          
                          
                            if
                          
                          
                             (tryRelease(arg))
          unblock the first queued thread;
                          
                        

而tryAquire()和tryRelease()方法都是需要子类去实现的.

换句话说,如果要使用AQS,那么只需要继承,然后实现如下方法来自定义资源获取和释放的逻辑就行了.

FIFO队列的节点使用内部类Node来描述:

                          
                            static
                          
                          
                            final
                          
                          
                            class
                          
                          
                             Node {

        
                          
                          
                            //
                          
                          
                             节点状态
                          
                          
                            volatile
                          
                          
                            int
                          
                          
                             waitStatus;

        
                          
                          
                            //
                          
                          
                             上一个节点
                          
                          
                            volatile
                          
                          
                             Node prev;

        
                          
                          
                            //
                          
                          
                             下一节点
                          
                          
                            volatile
                          
                          
                             Node next;

        
                          
                          
                            //
                          
                          
                             需要排队的线程
                          
                          
                            volatile
                          
                          
                             Thread thread;
    }
                          
                        

其他代码我先省略.

                          
                            public
                          
                          
                            abstract
                          
                          
                            class
                          
                          
                             AbstractQueuedSynchronizer
    
                          
                          
                            extends
                          
                          
                             AbstractOwnableSynchronizer
    
                          
                          
                            implements
                          
                          
                             java.io.Serializable {
    
                          
                          
                            //
                          
                          
                             双向链表头节点
                          
                          
                            private
                          
                          
                            transient
                          
                          
                            volatile
                          
                          
                             Node head;

    
                          
                          
                            //
                          
                          
                             双向链表为节点
                          
                          
                            private
                          
                          
                            transient
                          
                          
                            volatile
                          
                          
                             Node tail;

    
                          
                          
                            //
                          
                          
                             资源
                          
                          
                            private
                          
                          
                            volatile
                          
                          
                            int
                          
                          
                             state;
}
                          
                        

  。

2.申请互斥资源的源码分析。

2.1 分析acquire()方法的逻辑

                          
                            public
                          
                          
                            final
                          
                          
                            void
                          
                           acquire(
                          
                            int
                          
                          
                             arg) {
        
                          
                          
                            if
                          
                           (!tryAcquire(arg) &&
                          
                            
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
                          
                        

tryAcuire()方法可以理解成获取资源,由子类实现,我们不关注.

我把代码写成下面这样,可能好理解一些:

                          
                            public
                          
                          
                            final
                          
                          
                            void
                          
                           acquire(
                          
                            int
                          
                          
                             arg) {
    
                          
                          
                            if
                          
                          
                            (tryAcquire(arg)) {
        
                          
                          
                            return
                          
                          
                            ;
    }
    Node n 
                          
                          =
                          
                             addWaiter(Node.EXCLUSIVE);
    
                          
                          
                            if
                          
                          
                            (acquireQueued(n, arg))
        selfInterrupt();
}
                          
                        

  。

                          
                            public
                          
                          
                            final
                          
                          
                            void
                          
                           acquire(
                          
                            int
                          
                          
                             arg) {
    
                          
                          
                            //
                          
                          
                             如果获取资源成功,直接返回
                          
                          
                            if
                          
                          
                            (tryAcquire(arg)) {
        
                          
                          
                            return
                          
                          
                            true
                          
                          
                            ;
    }
    
                          
                          
                            //
                          
                          
                             未获取到资源,将自己封装成一个node添加到队列尾部
                          
                          
    Node n =
                          
                             addWaiter(Node.EXCLUSIVE);
    
                          
                          
                            //
                          
                          
                             有点复杂,下面慢慢分析
                          
                          
                            if
                          
                          
                            (acquireQueued(n, arg)) {
        
                          
                          
                            //
                          
                          
                             自我中断 先不关注。
                          
                          
                                    selfInterrupt();
    }
}
                          
                        

起码,我们应该知道一点,该方法一旦返回了,那么就意味着可以进入同步块代码执行了.

2.2 分析addWaiter()方法

                          
                            private
                          
                          
                             Node addWaiter(Node mode) {
    
                          
                          
                            //
                          
                          
                             以互斥模式创建一个node,waitStatus是0
                          
                          
    Node node = 
                          
                            new
                          
                          
                             Node(Thread.currentThread(), mode);
    
                          
                          
                            //
                          
                          
                             其实该段代码跟下面的enq方法差不多
                          
                          
    Node pred =
                          
                             tail;
    
                          
                          
                            //
                          
                          
                             如果该队列已经有为节点
                          
                          
                            if
                          
                           (pred != 
                          
                            null
                          
                          
                            ) {
        
                          
                          
                            //
                          
                          
                             当前node上一个node指向为尾节点
                          
                          
        node.prev =
                          
                             pred;
        
                          
                          
                            //
                          
                          
                             cas修改尾节点
                          
                          
                            if
                          
                          
                             (compareAndSetTail(pred, node)) {
            pred.next 
                          
                          =
                          
                             node;
            
                          
                          
                            return
                          
                          
                             node;
        }
    }
    
                          
                          
                            //
                          
                          
                             死循环保证添加成功
                          
                          
                                enq(node);
    
                          
                          
                            return
                          
                          
                             node;
}



                          
                          
                            private
                          
                           Node enq(
                          
                            final
                          
                          
                             Node node) {
    
                          
                          
                            for
                          
                          
                             (;;) {
        Node t 
                          
                          =
                          
                             tail;
        
                          
                          
                            if
                          
                           (t == 
                          
                            null
                          
                          ) { 
                          
                            //
                          
                          
                             Must initialize
            
                          
                          
                            //
                          
                          
                             如果当前队列没有节点,创建一个虚拟节点作为头和尾,该节点的thread == null, waitStatus是0
            
                          
                          
                            //
                          
                          
                             通过cas操作保证只有一个线程修改成功
                          
                          
                            if
                          
                           (compareAndSetHead(
                          
                            new
                          
                          
                             Node()))
                tail 
                          
                          =
                          
                             head;
        } 
                          
                          
                            else
                          
                          
                             {
            node.prev 
                          
                          =
                          
                             t;
            
                          
                          
                            if
                          
                          
                             (compareAndSetTail(t, node)) {
                t.next 
                          
                          =
                          
                             node;
                
                          
                          
                            return
                          
                          
                             t;
            }
        }
    }
}
                          
                        

其实上述方法就是保证了并发情况下node一定能正确加入到node中, 而且如果是空链表,会增加一个虚拟的head节点 .

图解一下:

(创建head虚拟节点) 。

  。

 将新节点指向tail 。

 通过cas操作修改tail指向新节点 。

 如果修改成功,将修改前的tail的next指向新节点 。

  。

2.3 分析aquireQueued方法

                          
                            //
                          
                          
                             看这个方法的时候我建议不要关注其中的临时变量

                          
                          
                            //
                          
                          
                             我们只要知道,这个方法里面有个死循环,不管怎样,只有return了,才能执行业务中定义的同步代码块。
                          
                          
                            final
                          
                          
                            boolean
                          
                           acquireQueued(
                          
                            final
                          
                           Node node, 
                          
                            int
                          
                          
                             arg) {
    
                          
                          
                            boolean
                          
                           failed = 
                          
                            true
                          
                          
                            ;
    
                          
                          
                            try
                          
                          
                             {
        
                          
                          
                            boolean
                          
                           interrupted = 
                          
                            false
                          
                          
                            ;
        
                          
                          
                            for
                          
                          
                             (;;) {
            
                          
                          
                            final
                          
                           Node p =
                          
                             node.predecessor();
            
                          
                          
                            if
                          
                           (p == head &&
                          
                             tryAcquire(arg)) {
                setHead(node);
                p.next 
                          
                          = 
                          
                            null
                          
                          ; 
                          
                            //
                          
                          
                             help GC
                          
                          
                failed = 
                          
                            false
                          
                          
                            ;
                
                          
                          
                            //
                          
                          
                             唯一的返回点
                          
                          
                            return
                          
                          
                             interrupted;
            }
            
                          
                          
                            if
                          
                           (shouldParkAfterFailedAcquire(p, node) &&
                          
                            
                parkAndCheckInterrupt())
                interrupted 
                          
                          = 
                          
                            true
                          
                          
                            ;
        }
    } 
                          
                          
                            finally
                          
                          
                             {
        
                          
                          
                            if
                          
                          
                             (failed)
            cancelAcquire(node);
    }
}


                          
                          
                            //
                          
                          
                             我把代码改写成下面这样 可能会好理解一些
                          
                          
                            final
                          
                          
                            boolean
                          
                           acquireQueued(
                          
                            final
                          
                           Node node, 
                          
                            int
                          
                          
                             arg) {
    
                          
                          
                            boolean
                          
                           failed = 
                          
                            true
                          
                          
                            ;
    
                          
                          
                            try
                          
                          
                             {
        
                          
                          
                            boolean
                          
                           interrupted = 
                          
                            false
                          
                          
                            ;
        
                          
                          
                            for
                          
                          
                             (;;) {
            
                          
                          
                            final
                          
                           Node p =
                          
                             node.predecessor();
            
                          
                          
                            //
                          
                          
                             只有前继节点是head的时候具有获取资源的资格,如果获取成功则直接将当前node设置成head
                          
                          
                            if
                          
                           (p == head &&
                          
                             tryAcquire(arg)) {
                
                          
                          
                            //
                          
                          
                             可能在排队后未阻塞前执行,也可能在阻塞被唤醒后执行
                          
                          
                                            setHead(node);
                p.next 
                          
                          = 
                          
                            null
                          
                          ; 
                          
                            //
                          
                          
                             help GC
                          
                          
                failed = 
                          
                            false
                          
                          
                            ;
                
                          
                          
                            //
                          
                          
                             唯一的返回点
                          
                          
                            return
                          
                          
                             interrupted;
            }
            
                          
                          
                            //
                          
                          
                             判断是否该阻塞自己
                          
                          
                            if
                          
                           (!
                          
                            shouldParkAfterFailedAcquire(p, node)){
               
                          
                          
                            continue
                          
                          
                            ;
            }
            
                          
                          
                            //
                          
                          
                             进行阻塞,并且在被唤醒之后返回线程的中断状态
                          
                          
                            if
                          
                          (!
                          
                            parkAndCheckInterrupt()) {
                
                          
                          
                            continue
                          
                          
                            ;
            }
            interrupted 
                          
                          = 
                          
                            true
                          
                          
                            ;
        }
    } 
                          
                          
                            finally
                          
                          
                             {
        
                          
                          
                            //
                          
                          
                             先别关注。
                          
                          
                            if
                          
                          
                             (failed)
            cancelAcquire(node);
    }
}
                          
                        

总之,一旦该方法返回了,就意味着线程获取资源成功了.

下面图解获取资源成功后,做的修改:

  。

  。

  。

 当然这里的waitStatus不一定是0.

2.4 分析shouldParkAfterFailedAcquire()方法

                          
                            private
                          
                          
                            static
                          
                          
                            boolean
                          
                          
                             shouldParkAfterFailedAcquire(Node pred, Node node) {
    
                          
                          
                            int
                          
                           ws =
                          
                             pred.waitStatus;
    
                          
                          
                            //
                          
                          
                             如果上一节点状态是-1,直接返回true
                          
                          
                            if
                          
                           (ws ==
                          
                             Node.SIGNAL)
        
                          
                          
                            //
                          
                          
                             返回,并触发外层阻塞线程
                          
                          
                            return
                          
                          
                            true
                          
                          
                            ;
    
                          
                          
                            if
                          
                           (ws > 0
                          
                            ) {
        
                          
                          
                            //
                          
                          
                             如果上一个节点状态大于0,其实就是1表示被取消了
        
                          
                          
                            //
                          
                          
                             往前找waitStatus正常的节点
                          
                          
                            do
                          
                          
                             {
            node.prev 
                          
                          = pred =
                          
                             pred.prev;
        } 
                          
                          
                            while
                          
                           (pred.waitStatus > 0
                          
                            );
        pred.next 
                          
                          =
                          
                             node;
    } 
                          
                          
                            else
                          
                          
                             {
        
                          
                          
                            //
                          
                          
                             cas修改上一个节点为状态为-1
                          
                          
                                    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    
                          
                          
                            //
                          
                          
                             返回进入下一个循环
                          
                          
                            return
                          
                          
                            false
                          
                          
                            ;
}
                          
                        

其实逻辑很简单,就是修改自己的prev指向往前寻找的有效节点,并在阻塞前,将prev指向的有效节点waitStatus设置为-1.

问题来了?这个代码没有并发问题吗?

                          
                            do
                          
                          
                             {
            node.prev 
                          
                          = pred =
                          
                             pred.prev;
        } 
                          
                          
                            while
                          
                           (pred.waitStatus > 0
                          
                            );
        pred.next 
                          
                          = node;
                        

答案是没有并发问题,因为每个节点都是往前寻找,理论上讲每个线程遍历的节点不一样.

我们来图解一下这个循环修改的过程:

假设线程A(waitStatus = 0),线程B(waitStatus = 1),线程C(waitStatus = 0 为当前线程) 。

 下图标识了循环往前探测并修改引用关系 。

 下图就是修改了pred节点的waitStatus = -1 (下一次循环到来的时候) 。

  。

此时中间线程B的节点其实没有引用再指向他了.

如果成功修改线程A的waitStatus为-1之后,下一次循环到来,该方法就返回了true,线程就阻塞了.

 其实看到这里, 我们可以将虚拟的head节点就当做是正在使用资源的线程表示(个人观点哈) .

 互斥资源申请的源码分析就结束了.

做一个小总结:

                          
                            public
                          
                          
                            final
                          
                          
                            void
                          
                           acquire(
                          
                            int
                          
                          
                             arg) {
    
                          
                          
                            //
                          
                          
                             如果争抢资源成功直接返回直接业务的同步代码块
                          
                          
                            if
                          
                          
                             (tryAcquire(arg)) {
        
                          
                          
                            return
                          
                          
                            ;
    }
    
                          
                          
                            //
                          
                          
                             循环+cas保证向队列中添加当前线程的node成功
                          
                          
    Node n =
                          
                             addWaiter(Node.EXCLUSIVE);
    
                          
                          
                            //
                          
                          
                             死循环,争抢资源,或者阻塞,或者唤醒之后继续争抢资源,直到抢资源成功后返回。
                          
                          
                            if
                          
                          
                            (acquireQueued(n, arg)) {
        selfInterrupt();
    }
    
                          
                          
                            //
                          
                          
                             未被中断的抢到资源
                          
                          
}
                        

3.释放互斥资源的源码分析。

我们直接分析release()方法:

                          
                            public
                          
                          
                            final
                          
                          
                            boolean
                          
                           release(
                          
                            int
                          
                          
                             arg) {
    
                          
                          
                            if
                          
                          
                             (tryRelease(arg)) {
        
                          
                          
                            //
                          
                          
                             一旦释放资源,此处就需要考虑并发问题了
                          
                          
        Node h =
                          
                             head;
        
                          
                          
                            //
                          
                          
                             head节点是虚拟节点,不可能是取消状态所以这里的判断可以理解为
        
                          
                          
                            //
                          
                          
                             头结点不为空而且头结点的waitStatus = -1执行unparkSuccessor方法
                          
                          
                            if
                          
                           (h != 
                          
                            null
                          
                           && h.waitStatus != 0
                          
                            )
            unparkSuccessor(h);
        
                          
                          
                            //
                          
                          
                             h == null 或者 h.waitStatus = 0说明没有后继节点需要唤醒,
        
                          
                          
                            //
                          
                          
                             如果此时正好head后面一个node正在试图修改head的状态改成-1是能改成功的
        
                          
                          
                            //
                          
                          
                             但是由于之前分析的acquireQueued方法是一个死循环,哪怕head被修改成-1,
        
                          
                          
                            //
                          
                          
                             但是由于该循环会先抢锁所以也就不存在线程改了状态会park的问题。
        
                          
                          
                            //
                          
                          
                             h == null说明并无线程参与竞争
                          
                          
                            return
                          
                          
                            true
                          
                          
                            ;
    }
    
                          
                          
                            return
                          
                          
                            false
                          
                          
                            ;
}
                          
                        

  。

其实关键就是unparkSuccessor()方法 。

                          
                            private
                          
                          
                            void
                          
                          
                             unparkSuccessor(Node node) {
    
                          
                          
                            //
                          
                          
                             进入该方法说明资源已经被释放了。
                          
                          
                            int
                          
                           ws =
                          
                             node.waitStatus;
    
                          
                          
                            //
                          
                          
                             如果ws小于0,修改为0
                          
                          
                            if
                          
                           (ws < 0
                          
                            )
        compareAndSetWaitStatus(node, ws, 
                          
                          0
                          
                            );
    
                          
                          
                            //
                          
                          
                             传入node就是head
    
                          
                          
                            //
                          
                          
                             s指向head下一节点
                          
                          
    Node s =
                          
                             node.next;
    
                          
                          
                            //
                          
                          
                             如果没有后继节点说明有线程已经抢到资源
    
                          
                          
                            //
                          
                          
                             如果后继节点被取消了
                          
                          
                            if
                          
                           (s == 
                          
                            null
                          
                           || s.waitStatus > 0
                          
                            ) {
        
                          
                          
                            //
                          
                          
                             假设没有节点需要唤醒
                          
                          
        s = 
                          
                            null
                          
                          
                            ;
        
                          
                          
                            //
                          
                          
                             从后往前找,找到距离head最近的节点唤醒
                          
                          
                            for
                          
                           (Node t = tail; t != 
                          
                            null
                          
                           && t != node; t =
                          
                             t.prev)
            
                          
                          
                            if
                          
                           (t.waitStatus <= 0
                          
                            )
                s 
                          
                          =
                          
                             t;
    }
    
                          
                          
                            if
                          
                           (s != 
                          
                            null
                          
                          
                            )
        LockSupport.unpark(s.thread);
}
                          
                        

其实执行该方法的时候是存在并发的情况的.

我理解这个地方从后往前找是为啥呢?能不能从前往后找?

  。

这个问题,我也不知道。。。.

  。

最后此篇关于AQS实现原理的文章就讲到这里了,如果你想了解更多关于AQS实现原理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com