你的位置:首页 > Java教程

[Java教程]第八章 ArrayBlockingQueue源码解析


注意:在阅读本文之前或在阅读的过程中,需要用到ReentrantLock,内容见《第五章 ReentrantLock源码解析1--获得非公平锁与公平锁lock()》《第六章 ReentrantLock源码解析2--释放锁unlock()》《第七章 ReentrantLock总结》

1、对于ArrayBlockingQueue需要掌握以下几点

  • 创建
  • 入队(添加元素)
  • 出队(删除元素)

2、创建

  • public ArrayBlockingQueue(int capacity, boolean fair)
  • public ArrayBlockingQueue(int capacity)

使用方法:

  • Queue<String> abq = new ArrayBlockingQueue<String>(2);
  • Queue<String> abq = new ArrayBlockingQueue<String>(2,true);

通过使用方法,可以看出ArrayBlockingQueue支持ReentrantLock的公平锁模式与非公平锁模式,对于这两种模式,查看本文开头的文章即可。

源代码如下:

  private final E[] items;//底层数据结构  private int takeIndex;//用来为下一个take/poll/remove的索引(出队)  private int putIndex;//用来为下一个put/offer/add的索引(入队)  private int count;//队列中元素的个数  /*   * Concurrency control uses the classic two-condition algorithm found in any   * textbook.   */  /** Main lock guarding all access */  private final ReentrantLock lock;//锁  /** Condition for waiting takes */  private final Condition notEmpty;//等待出队的条件  /** Condition for waiting puts */  private final Condition notFull;//等待入队的条件

View Code
  /**   * 创造一个队列,指定队列容量,指定模式   * @param fair   * true:先来的线程先操作   * false:顺序随机   */  public ArrayBlockingQueue(int capacity, boolean fair) {    if (capacity <= 0)      throw new IllegalArgumentException();    this.items = (E[]) new Object[capacity];//初始化类变量数组items    lock = new ReentrantLock(fair);//初始化类变量锁lock    notEmpty = lock.newCondition();//初始化类变量notEmpty Condition    notFull = lock.newCondition();//初始化类变量notFull Condition  }  /**   * 创造一个队列,指定队列容量,默认模式为非公平模式   * @param capacity <1会抛异常   */  public ArrayBlockingQueue(int capacity) {    this(capacity, false);  }

View Code

注意:

  • ArrayBlockingQueue的组成:一个对象数组+1把锁ReentrantLock+2个条件Condition
  • 在查看源码的过程中,也要模仿带条件锁的使用,这个双条件锁模式是很经典的模式

3、入队

3.1、public boolean offer(E e)

原理:

  • 在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false

使用方法:

  • abq.offer("hello1");

源代码:

  /**   * 在队尾插入一个元素,   * 如果队列没满,立即返回true;   * 如果队列满了,立即返回false   * 注意:该方法通常优于add(),因为add()失败直接抛异常   */  public boolean offer(E e) {    if (e == null)      throw new NullPointerException();    final ReentrantLock lock = this.lock;    lock.lock();    try {      if (count == items.length)//数组满了        return false;      else {//数组没满        insert(e);//插入一个元素        return true;      }    } finally {      lock.unlock();    }  }

View Code
  private void insert(E x) {    items[putIndex] = x;//插入元素    putIndex = inc(putIndex);//putIndex+1    ++count;//元素数量+1    /**     * 唤醒一个线程     * 如果有任意一个线程正在等待这个条件,那么选中其中的一个区唤醒。     * 在从等待状态被唤醒之前,被选中的线程必须重新获得锁     */    notEmpty.signal();  }

View Code
  /**   * i+1,数组下标+1   */  final int inc(int i) {    return (++i == items.length) ? 0 : i;  }

View Code

代码非常简单,流程看注释即可,只有一点注意点:

  • 在插入元素结束后,唤醒等待notEmpty条件(即获取元素)的线程,可以发现这类似于生产者-消费者模式

 

3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 在队尾插入一个元素,,如果数组已满,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

使用方法:

    try {      abq.offer("hello2",1000,TimeUnit.MILLISECONDS);    } catch (InterruptedException e) {      e.printStackTrace();    }

View Code

源代码:

  /**   * 在队尾插入一个元素,   * 如果数组已满,则进入等待,直到出现以下三种情况:   * 1、被唤醒   * 2、等待时间超时   * 3、当前线程被中断   */  public boolean offer(E e, long timeout, TimeUnit unit)      throws InterruptedException {    if (e == null)      throw new NullPointerException();    long nanos = unit.toNanos(timeout);//将超时时间转换为纳秒    final ReentrantLock lock = this.lock;    /*     * lockInterruptibly():     * 1、 在当前线程没有被中断的情况下获取锁。     * 2、如果获取成功,方法结束。     * 3、如果锁无法获取,当前线程被阻塞,直到下面情况发生:     * 1)当前线程(被唤醒后)成功获取锁     * 2)当前线程被其他线程中断     *     * lock()     * 获取锁,如果锁无法获取,当前线程被阻塞,直到锁可以获取并获取成功为止。     */    lock.lockInterruptibly();//加可中断的锁    try {      for (;;) {        if (count != items.length) {//队列未满          insert(e);          return true;        }        if (nanos <= 0)//已超时          return false;        try {          /*           * 进行等待:           * 在这个过程中可能发生三件事:           * 1、被唤醒-->继续当前这个for(;;)循环           * 2、超时-->继续当前这个for(;;)循环           * 3、被中断-->之后直接执行catch部分的代码           */          nanos = notFull.awaitNanos(nanos);//进行等待(在此过程中,时间会流失,在此过程中,线程也可能被唤醒)        } catch (InterruptedException ie) {//在等待的过程中线程被中断          notFull.signal(); // 唤醒其他未被中断的线程          throw ie;        }      }    } finally {      lock.unlock();    }  }

View Code

注意:

  • awaitNanos(nanos)是AQS中的一个方法,这里就不详细说了,有兴趣的自己去查看AQS的源代码。
  • lockInterruptibly()与lock()的区别见注释

 

3.3、public void put(E e) throws InterruptedException

原理:

  • 在队尾插入一个元素,如果队列满了,一直阻塞,直到数组不满了或者线程被中断

使用方法:

    try {      abq.put("hello1");    } catch (InterruptedException e) {      e.printStackTrace();    }

View Code

源代码:

  /**   * 在队尾插入一个元素   * 如果队列满了,一直阻塞,直到数组不满了或者线程被中断   */  public void put(E e) throws InterruptedException {    if (e == null)      throw new NullPointerException();    final E[] items = this.items;    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    try {      try {        while (count == items.length)//队列满了,一直阻塞在这里          /*           * 一直等待条件notFull,即被其他线程唤醒           * (唤醒其实就是,有线程将一个元素出队了,然后调用notFull.signal()唤醒其他等待这个条件的线程,同时队列也不慢了)           */          notFull.await();      } catch (InterruptedException ie) {//如果被中断        notFull.signal(); // 唤醒其他等待该条件(notFull,即入队)的线程        throw ie;      }      insert(e);    } finally {      lock.unlock();    }  }

View Code

 

4、出队

4.1、public E poll()

原理:

  • 如果没有元素,直接返回null;如果有元素,将队头元素置null,但是要注意队头是随时变化的,并非一直是items[0]。

使用方法:

abq.poll();

源代码:

  /**   * 出队   */  public E poll() {    final ReentrantLock lock = this.lock;    lock.lock();    try {      if (count == 0)//如果没有元素,直接返回null,而非抛出异常        return null;      E x = extract();      return x;    } finally {      lock.unlock();    }  }

View Code
  /**   * 出队   */  private E extract() {    final E[] items = this.items;    E x = items[takeIndex];//获取出队元素    items[takeIndex] = null;//将出队元素位置置空    /*     * 第一次出队的元素takeIndex==0,第二次出队的元素takeIndex==1     * (注意:这里出队之后,并没有将后面的数组元素向前移)     */    takeIndex = inc(takeIndex);    --count;//数组元素个数-1    notFull.signal();//数组已经不满了,唤醒其他等待notFull条件的线程    return x;//返回出队的元素  }

View Code

 

4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 从对头删除一个元素,如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

使用方法:

    try {      abq.poll(1000, TimeUnit.MILLISECONDS);    } catch (InterruptedException e) {      e.printStackTrace();    }

View Code

源代码:

  /**   * 从对头删除一个元素,   * 如果数组不空,出队;   * 如果数组已空,判断时间是否超时,如果已经超时,返回null   * 如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:   * 1、被唤醒   * 2、等待时间超时   * 3、当前线程被中断   */  public E poll(long timeout, TimeUnit unit) throws InterruptedException {    long nanos = unit.toNanos(timeout);//将时间转换为纳秒    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    try {      for (;;) {        if (count != 0) {//数组不空          E x = extract();//出队          return x;        }        if (nanos <= 0)//时间超时          return null;        try {          /*           * 进行等待:           * 在这个过程中可能发生三件事:           * 1、被唤醒-->继续当前这个for(;;)循环           * 2、超时-->继续当前这个for(;;)循环           * 3、被中断-->之后直接执行catch部分的代码           */          nanos = notEmpty.awaitNanos(nanos);        } catch (InterruptedException ie) {          notEmpty.signal(); // propagate to non-interrupted thread          throw ie;        }      }    } finally {      lock.unlock();    }  }

View Code

 

4.3、public E take() throws InterruptedException

原理:

  • 将队头元素出队,如果队列空了,一直阻塞,直到数组不为空或者线程被中断

使用方法:

    try {      abq.take();    } catch (InterruptedException e) {      e.printStackTrace();    }

View Code

源代码:

  /**   * 将队头元素出队   * 如果队列空了,一直阻塞,直到数组不为空或者线程被中断   */  public E take() throws InterruptedException {    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    try {      try {        while (count == 0)//如果数组为空,一直阻塞在这里          /*           * 一直等待条件notEmpty,即被其他线程唤醒           * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal()唤醒其他等待这个条件的线程,同时队列也不空了)           */          notEmpty.await();      } catch (InterruptedException ie) {        notEmpty.signal(); // propagate to non-interrupted thread        throw ie;      }      E x = extract();      return x;    } finally {      lock.unlock();    }  }

View Code

  

总结:

1、具体入队与出队的原理图:这里只说一种情况,见下图,途中深色部分表示已有元素,浅色部分没有元素。

 

上面这种情况是怎么形成的呢?当队列满了,这时候,队头元素为items[0]出队了,就形成上边的这种情况。

假设现在又要出队了,则现在的队头元素是items[1],出队后就形成下面的情形。

 

出队后,对头元素就是items[2]了,假设现在有一个元素将要入队,根据inc方法,我们可以得知,他要插入到items[0]去,入队了形成下图:

以上就是整个入队出队的流程,inc方法上边已经给出,这里再贴一遍:

  /**   * i+1,数组下标+1   * 注意:这里这样写的原因。   */  final int inc(int i) {    return (++i == items.length) ? 0 : i;  }

View Code

 

2、三种入队对比:

  • offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞
  • put(E e):如果队列满了,一直阻塞,直到数组不满了或者线程被中断-->阻塞
  • offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果数组已满,则进入等待,直到出现以下三种情况:-->阻塞
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

 

3、三种出对对比:

  • poll():如果没有元素,直接返回null;如果有元素,出队
  • take():如果队列空了,一直阻塞,直到数组不为空或者线程被中断-->阻塞
  • poll(long timeout, TimeUnit unit):如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断