你的位置:首页 > Java教程

[Java教程]基于ReentrantLock的AQS的源码分析(独占、非中断、不超时部分)

刚刚看完了并发实践这本书,算是理论具备了,看到了AQS的介绍,再看看源码,发现要想把并发理解透还是很难得,花了几个小时细分析了一下把可能出现的场景尽可能的往代码中去套,还是有些收获,但是真的很费脑,还是对多线程的理解太浅了,不多说了,直接上代码吧。

这段代码不是为跑通,只是把AQS,ReentrantLock中的部分源码合并到了一起,便于理解。

 1 package com.yb.interview.concurrent; 2  3  4 import java.util.concurrent.locks.LockSupport; 5  6 public class AQSSourceStudy { 7  8   abstract static class AQS { 9     /** 10      * 这个状态是有子类来维护的,AQS不会用这个状态做什么 11     */ 12     private volatile int state; 13     /** 14      * 队尾节点 15     */ 16     private volatile Node tail; 17     /** 18      * 可能情况 19     */ 20     private volatile Node head; 21     /** 22      * 独占线程 23     */ 24     private Thread exclusiveOwnerThread; 25  26  27     /** 28      * 由子类实现 29      * 判断当前线程是否需要排队 30     */ 31     abstract boolean tryAcquire(int i); 32  33     public int getState() { 34       return state; 35     } 36  37     public void setState(int state) { 38       this.state = state; 39     } 40  41     /** 42      * 主方法 43      * 可能的情况 44      * 当前状态可以直接运行 45      * 当前状态要放入队列里等待 46      * 状态->子类获取 47      * 过程,尽可能的不要去阻塞,循环多次,竞争多次 48      * 创建节点 49      * 节点入队,队尾 50      * 判断新节点的前一个节点的状态,更新,前一个节点,因为在入队的过程中每个节点的状态是动的 51      * 最后,阻塞当前线程 52     */ 53     public final void acquire(int arg) { 54       if (!tryAcquire(arg) && 55           acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 56         // 中断状态传播 57         // 实时或者将来阻塞,抛中断异常 58         selfInterrupt(); 59     } 60  61     /** 62      * 当有新节点入队时,循环的把新节点关联到一个有效节点的后面 63      * 然后,阻塞这个节点的线程(当前线程) 64     */ 65     private boolean acquireQueued(Node node, int arg) { 66       boolean failed = true; 67       try { 68         boolean interrupted = false; 69         for (; ; ) { 70           final Node p = node.predecessor(); 71           // 新节点的前个节点是头结点,如果头结点的线程释放,新节点接可以直接执行 72           // 所有不要着急阻塞,在判断一次,头结点释放没有,如果头结点释放,新节点不阻塞,把新节点设为头结点 73           // 当新节点没有排队直接运行了,之后要将节点标记为无效 cancelAcquire 74           if (p == head && tryAcquire(arg)) { 75             // 想了很久这段代码发生的情况 76             // 这段代码发生的情况 77             // 1.node在入队列时,有不同的线程在获得了锁,且队列中没有节点 78             // 2.当执行到这里再次tryAcquire之前,之前释放了锁 79             // 3.这时hasQueuedPredecessors中的判断,头结点的后一个节点,是新建的这个节点,满足s.thread==Thread.currentThread(不考虑这时有其他线程进入,或者进入无效) 80             // 满足了tryAcquire返回true的情况 81             // 将头结点改为新节点 82             /**** 83              * head     tail 84              * |        | 85              * |        | 86              * ----------  --------- 87              * nullNode   newNode 88              * ---------   ---------- 89              * next=newNode prev=nullNode 90              * prev=null   next=null 91              * -------    ---------- 92              * 93              * 改完后 94              * 95              *       head tail 96              *        |  | 97              *        |  | 98              * ---------  --------- 99              * nullNode   newNode100              * ---------   ---------101              * next=newNode prev=nullNode102              * prev=null   next=null103              * ---------   ----------104              * */105 106             setHead(node);107             p.next = null;108             failed = false;109             return interrupted;110           }111           // 之前的节点不是正在执行线程的节点,调整位置和状态再阻塞112           // 在线程解除阻塞后,使者节点失效113           if (shouldParkAfterFailedAcquire(p, node) &&114               parkAndCheckInterrupt())115             interrupted = true;116         }117       } finally {118         if (failed)119           // 节点解除阻塞后,可能是中断或者超时120           // 非unlock的解锁121           cancelAcquire(node);122       }123     }124 125     private void cancelAcquire(Node node) {126       if (node == null)127         return;128       node.thread = null;129       Node pred = node.prev;130       // 那个空的节点会保证终止131       while (pred.waitStatus > 0)132         // 将节点的prev关联到最近的有效节点133         node.prev = pred = pred.prev;134       Node predNext = pred.next;135       // 任何情况都执行的136       node.waitStatus = Node.CANCELLED;137 138       // 如果取消的节点是队尾节点,并且将前节点设为队尾节点139       if (node == tail && compareAndSetTail(node, pred)) {140         // cancel的节点和cancel之前的无效节点会移出队列141         compareAndSetNext(pred, predNext, null);142       } else {143         // 如果不是队尾节点144         int ws;145         if (pred != head &&146             ((ws = pred.waitStatus) == Node.SIGNAL ||147                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&148             pred.thread != null) {149           Node next = node.next;150           if (next != null && next.waitStatus <= 0)151             // prev->node->next 改为 prev->next152             compareAndSetNext(pred, predNext, next);153         } else {154           // 判断锁定的状态155           // 如果前节点是头结点,或者不是SIGNAL状态并且无法设置为SIGNAL状态156           // 总结,取消一个节点是,要保证这个节点能被释放,要不通过前节点通知,在锁锁,对应release157           unparkSuccessor(node);158         }159 160         node.next = node; // help GC161       }162     }163 164     private void unparkSuccessor(Node node) {165       // 解锁节点的线程166       // 当node时头节点时,是当前获取线程释放的炒作167       // 不是偷节点168       int ws = node.waitStatus;169       if (ws < 0)170         // 不用再去通知下个节点了,即将释放node了171         compareAndSetWaitStatus(node, ws, 0);172       Node s = node.next;173       if (s == null || s.waitStatus > 0) {174         s = null;175         // 从队尾向前找到最前有效的节点176         for (Node t = tail; t != null && t != node; t = t.prev)177           if (t.waitStatus <= 0)178             s = t;179       }180       if (s != null)181         LockSupport.unpark(s.thread);182 183     }184 185     private void compareAndSetNext(Node pred, Node predNext, Object o) {186 187     }188 189     private boolean parkAndCheckInterrupt() {190       // 阻塞191       LockSupport.park(this);192       // 当前前程标记中断193       return Thread.interrupted();194     }195 196     private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {197       int ws = pred.waitStatus;198       // 如果前节点是需要被通知的,前节点正在被阻塞,阻塞当先线程199       if (ws == Node.SIGNAL)200         return true;201       // 如果前节点是无效的,找到最近的一个有效节点,并关联,返回,在外部调用方法中会再次调用这个方法202       if (ws > 0) {203         do {204           node.prev = pred = pred.prev;205         } while (pred.waitStatus > 0);206         // 这是个切断调用链的过程207         pred.next = node;208       } else {209         // 更新前节点的状态,释放时通知新节点210         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);211       }212       return false;213     }214 215     /**216      * 创建节点217      * 节点入队218      *219      * @return 新节点220     */221     private Node addWaiter(Node mode) {222       Node node = new Node(Thread.currentThread(), mode);223       Node pred = tail;224       // 之前有节点在队列中225       if (pred != null) {226         node.prev = pred;227         // 直接修改队尾,不成功要进入接下类的循环,循环中也有类型的判断,这里添加会减少一些逻辑(这样说可能是理解的有偏差)228         if (compareAndSetTail(pred, node)) {229           pred.next = node;230           return node;231         }232       }233       enq(node);234       return node;235     }236 237     /**238      * 节点入队239      * 循环,直到把新节点放到队尾,在多线程中这个过程是不确定的240     */241     private Node enq(Node node) {242       for (; ; ) {243         Node t = tail;244         // Must initialize245         // 队尾没值,新节点是第一个入队的节点,创建一个空的节点,头尾都指向这个空节点246         if (t == null) {247           if (compareAndSetHead(new Node()))248             tail = head;249         } else {250           node.prev = t;251           if (compareAndSetTail(t, node)) {252             t.next = node;253             return t;254           }255         }256       }257     }258 259     /**260      * 字面理解,是否有已经排队的线程261      * 实际意义,有重入锁的情况,在这里要考虑到262      * 没有节点在排队的情况,头结点与未节点是相同的263      * 判断重入,当前线程是头结点的线程.264     */265     protected boolean hasQueuedPredecessors() {266       Node t = tail;267       Node h = head;268       Node s;269       //为什么是头结点的线程,而不是exclusiveOwnerThread,因为只有在270       // 当前队列里没有值得时候才回设置独占线程,如果是通过节点释放的线271       // 程还会和节点绑定,不会映射到exclusiveOwnerThread272       return h != t &&273           ((s = h.next) == null || s.thread != Thread.currentThread());274     }275 276     public final boolean release(int arg) {277       if (tryRelease(arg)) {278         Node h = head;279         // 在独占锁的时候,waitStatus只能为0 -1 -2 -3280         // 这个里不为0代表头节点是空节点281         // 空节点不需要释放282         // 头节点是释放锁的时候,最先被考虑的283         if (h != null && h.waitStatus != 0)284           unparkSuccessor(h);285         return true;286       }287       return false;288     }289 290     protected abstract boolean tryRelease(int arg);291 292 293     public void setHead(Node head) {294       this.head = head;295     }296 297     private boolean compareAndSetHead(Node node) {298       return (true || false);299     }300 301     private boolean compareAndSetTail(Node pred, Node node) {302       return (true || false);303     }304 305     protected void selfInterrupt() {306       Thread.currentThread().interrupt();307     }308 309 310     /**311      * CAS更新队列状态,CAS的问题在其他的机会介绍312     */313     boolean compareAndSetState(int o, int n) {314       return (false || true);315     }316 317     /**318      * 独占线程标记改为指定线程319     */320     void setExclusiveOwnerThread(Thread t) {321       exclusiveOwnerThread = t;322     }323 324     /**325      * 返回独占线程326     */327     Thread getExclusiveOwnerThread() {328       return exclusiveOwnerThread;329     }330 331     // 修改节点的状态332     private boolean compareAndSetWaitStatus(Node pred, int ws, int signal) {333       return (true || false);334     }335 336     static class Node {337 338       public int waitStatus;339 340       Node() {341       }342 343       /**344        * @param thread345        * @param mode  SHARED or EXCLUSIVE346       */347       Node(Thread thread, Node mode) {348         this.thread = Thread.currentThread();349         this.mode = mode;350       }351 352       // 共享模式标记353       static final Node SHARED = new Node();354       // 独占模式标记355       static final Node EXCLUSIVE = null;356 357       // 节点被取消,因为超时或者中断358       static final int CANCELLED = 1;359       // next被阻塞,当节点释放时,notice next360       static final int SIGNAL = -1;361       // 在条件队列中,等待某个条件被阻塞362       static final int CONDITION = -2;363       // 节点在共享模式下,可以传播锁364       static final int PROPAGATE = -3;365 366       volatile Node next;367       volatile Node prev;368       Node mode;369 370       public Thread thread;371 372       public Node predecessor() {373         Node p = prev;374         if (p == null)375           throw new NullPointerException();376         else377           return p;378       }379     }380 381 382   }383 384   /**385    * 这是一个独占锁的实现,从ReentrantLock中粘贴出来的部分代码386   */387   class SYC extends AQS {388 389     public void lock() {390       acquire(1);391     }392 393     public void unlock() {394       release(1);395     }396 397     protected final boolean tryAcquire(int acquires) {398       final Thread current = Thread.currentThread();399       int c = getState();400       // 如果当前的状态401       if (c == 0) {402         if (!hasQueuedPredecessors() &&403             compareAndSetState(0, acquires)) {404           setExclusiveOwnerThread(current);405           return true;406         }407       } else if (current == getExclusiveOwnerThread()) {408         int nextc = c + acquires;409         if (nextc < 0)410           throw new Error("Maximum lock count exceeded");411         setState(nextc);412         return true;413       }414       return false;415     }416 417     protected final boolean tryRelease(int releases) {418       int c = getState() - releases;419       if (Thread.currentThread() != getExclusiveOwnerThread())420         throw new IllegalMonitorStateException();421       boolean free = false;422       if (c == 0) {423         free = true;424         setExclusiveOwnerThread(null);425       }426       setState(c);427       return free;428     }429 430 431   }432 }