你的位置:首页 > Java教程

[Java教程]Java并发编程总结3——AQS、ReentrantLock、ReentrantReadWriteLock


本文内容主要总结自《Java并发编程的艺术》第5章——Java中的锁。

 

一、AQS

AbstractQueuedSynchronizer(简称AQS),队列同步器,是用来构建锁或者其他同步组建的基础框架。该类主要包括:

1、模式,分为共享和独占。

2、volatile int state,用来表示锁的状态。

3、FIFO双向队列,用来维护等待获取锁的线程。

AQS部分代码及说明如下:

public abstract class AbstractQueuedSynchronizer  extends AbstractOwnableSynchronizer  implements java.io.Serializable {  static final class Node {    /** 共享模式,表示可以多个线程获取锁,比如读写锁中的读锁 */    static final Node SHARED = new Node();    /** 独占模式,表示同一时刻只能一个线程获取锁,比如读写锁中的写锁 */    static final Node EXCLUSIVE = null;    volatile Node prev;    volatile Node next;    volatile Thread thread;  }  /** AQS类内部维护一个FIFO的双向队列,负责同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等    构造成一个节点Node并加入同步队列;当同步状态释放时,会把首节点中线程唤醒,使其再次尝试同步状态 */  private transient volatile Node head;  private transient volatile Node tail;  /** 状态,主要用来确定lock是否已经被占用;在ReentrantLock中,state=0表示锁空闲,>0表示锁已被占用;可以自定义,改写tryAcquire(int acquires)等方法即可 */  private volatile int state;}

这里主要说明下双向队列,通过查看源码分析,队列是这个样子的:

head -> node1 -> node2 -> node3(tail)

注意:head是一个空节点(所谓的空节点意思是节点中没有具体的线程信息),因此实际上head->next(即node1)才是队列中第一个可用节点。

AQS的设计基于模版方法模式,使用者通过继承AQS类并重写指定的方法,可以实现不同功能的锁。可重写的方法主要包括:

 

 

二、通过ReentrantLock学习AQS的使用

1、公平锁的获取

/** * Sync object for fair locks */static final class FairSync extends Sync {  private static final long serialVersionUID = -3000897897090466540L;  final void lock() {    acquire(1);  }  /**   * 首先尝试获取锁,如果tryAcquire(arg)返回true,获取锁成功;   * 如果失败,则调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg),将当前线程封装成Node节点加入到同步队列队尾,之后阻塞当前线程   */  public final void acquire(int arg) {    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))      selfInterrupt();  }  /**   * 获取state的值,如果等于0表示锁空闲,可以尝试获取;   * 查看当前线程是否是FIFO队列中的第一个可用节点,如果是第一个,则尝试通过CAS方式获取锁, 这保证了等待时间最长的必定先获取锁   */  protected final boolean tryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {      if (!hasQueuedPredecessors() &&        compareAndSetState(0, acquires)) {        setExclusiveOwnerThread(current);        return true;      }    }    else if (current == getExclusiveOwnerThread()) {      int nextc = c + acquires;      if (nextc < 0)        throw new Error("Maximum lock count exceeded");      setState(nextc);      return true;    }    return false;  }}

2、公平锁的释放

更新状态值state,之后将第一个可用节点从同步队列中删除即可。

 

三、公平锁和非公平锁

ReentrantLock默认的锁为非公平锁,其主要原因在于:与公平锁相比,可以避免大量的线程切换,极大的提高性能。

先看一个非公平锁的例子: 

public class AQS2 {  private ReentrantLock lock = new ReentrantLock(false);  private Thread[] threads = new Thread[3];  public AQS2() {    for (int i = 0; i < 3 ; i++) {      threads[i] = new Thread(new Runnable() {        public void run() {          for (int i = 0; i < 2; i++) {            try {              lock.lock();              Thread.sleep(100);              System.out.println(Thread.currentThread().getName());            } catch (InterruptedException e) {              e.printStackTrace();            } finally {              lock.unlock();            }          }        }      });    }  }  public void startThreads() {    for (Thread thread : threads) {      thread.start();    }  }  public static void main(String[] args) {    AQS2 aqs2 = new AQS2();    aqs2.startThreads();  }}

运行结果为:

 

这段代码(每个线程2次获取锁/释放锁)的运行结果我一开始没有想清楚,之前我是这么想的:

Thread0先获取锁,之后sleep 100ms,那么等待获取锁的同步队列为:

head -> thread1 -> thread2 -> thread0 -> thread1 -> thread2。

从运行结果可知,第二次获取锁的还是thread0,但是锁的释放release(int args)却总是从同步队列的第一个可用节点开始,那就把thread1从队列中移除了,逻辑明显不对了。

后来重新看了代码,比较了非公平锁和公平锁之间的不同时,才终于明白。

非公平锁获取锁最大的不一样的地方在于:线程可以无视sync同步队列插队!一旦插队成功,获得了锁,那么该线程当然也就不用在排队了。所以以上程序的同步队列应该为:

head -> thread1 -> thread2。

非公平锁源代码主要的不同点有2点:

static final class NonfairSync extends Sync {  private static final long serialVersionUID = 7316153563782823691L;    //不同点1  final void lock() {    if (compareAndSetState(0, 1))      setExclusiveOwnerThread(Thread.currentThread());    else      acquire(1);  }  protected final boolean tryAcquire(int acquires) {    return nonfairTryAcquire(acquires);  }  final boolean nonfairTryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {      if (compareAndSetState(0, acquires)) {    //不同点2        setExclusiveOwnerThread(current);        return true;      }    }    else if (current == getExclusiveOwnerThread()) {      int nextc = c + acquires;      if (nextc < 0) // overflow        throw new Error("Maximum lock count exceeded");      setState(nextc);      return true;    }    return false;  }}

        thread0第一次释放锁之后,会立刻通过lock.lock()操作继续尝试获取锁。非公平锁的lock()方法会直接尝试获取锁,无视同步队列,因此很大概率会再次获得锁;如果失败了,那么执行nonfairTryAcquire(int acquires)方法,该方法和tryAcquire(int acquires)最大的不同在于,缺少了hasQueuedPredecessors()的判断,即不需要判断当前线程是否是同步队列的第一个可用节点,甚至也不需要判断当前线程是否在同步队列中,直接尝试获取锁即可。

 

四、ReentrantReadWriteLnock

        理解了AQS的原理后,读写锁也就不难理解了。读写锁分为2个锁,读锁和写锁。读锁在同一时刻允许多个线程访问,通过改写int tryAcquireShared(int arg)以及boolean tryReleaseShared(int arg)方法即可;写锁为独占锁,通过改写boolean tryAcquire(int arg)以及boolean tryRelease(int arg)方法即可。

        由于AQS中只提供了一个int state来表示锁的状态,那么如何表示读和写2个锁呢?解决办法是前16位表示读锁,后16位表示写锁。由于锁的状态只有16位,因此无论是对于读锁或者是写锁,其state最大值均为65525,即所有获得了锁的线程的拿到锁的总次数(由于是重进入锁,因此每个线程可以拿到n个锁)不超过65536。由于读写锁主要的应用场景为多读少写,所以如果感觉读锁的65525不够用,可以自己改写读写锁即可,比如分配int state的前24位为读锁,后8位为写锁。

        读写锁还提供了一些新的方法,比如final int getReadHoldCount(),返回当前线程获取读锁的次数。由于读状态保存的是所有获取读锁的线程读锁次数的总和,因此每个线程自己的读锁次数需要单独保存,引入了ThreadLocal,由线程自身维护。