你的位置:首页 > Java教程

[Java教程]Java 线程通信


线程通信用来保证线程协调运行,一般在做线程同步的时候才需要考虑线程通信的问题。

1、传统的线程通信

通常利用Objeclt类提供的三个方法:

wait() 导致当前线程等待,并释放该同步监视器的锁定,直到其它线程调用该同步监视器的notify()或者notifyAll()方法唤醒线程。

notify(),唤醒在此同步监视器上等待的线程,如果有多个会任意选择一个唤醒

notifyAll() 唤醒在此同步监视器上等待的所有线程,这些线程通过调度竞争资源后,某个线程获取此同步监视器的锁,然后得以运行。

这三个方法必须由同步监视器对象调用,分为两张情况:

同步方法时,由于同步监视器为this对象,所以可以直接调用这三个方法。

示例如下:

public class SyncMethodThreadCommunication {  static class DataWrap{    int data = 0;    boolean flag = false;        public synchronized void addThreadA(){      if (flag) {        try {          wait();        } catch (InterruptedException e) {          e.printStackTrace();        }      }             data++;      System.out.println(Thread.currentThread().getName() + " " + data);      flag = true;      notify();    }        public synchronized void addThreadB() {      if (!flag) {        try {          wait();        } catch (InterruptedException e) {          e.printStackTrace();        }      }             data++;      System.out.println(Thread.currentThread().getName() + " " + data);      flag = false;      notify();    }  }    static class ThreadA extends Thread {    private DataWrap data;        public ThreadA(DataWrap dataWrap) {      this.data = dataWrap;    }        @Override    public void run() {      for (int i = 0; i < 10; i++) {        data.addThreadA();      }    }  }    static class ThreadB extends Thread {    private DataWrap data;        public ThreadB(DataWrap dataWrap) {      this.data = dataWrap;    }        @Override    public void run() {      for (int i = 0; i < 10; i++) {        data.addThreadB();      }    }  }    public static void main(String[] args) {    //实现两个线程轮流对数据进行加一操作    DataWrap dataWrap = new DataWrap();        new ThreadA(dataWrap).start();    new ThreadB(dataWrap).start();  }}

 

同步代码块时,需要使用监视器对象调用这三个方法。

示例如下:

public class SyncBlockThreadComminication {  static class DataWrap{    boolean flag;    int data;  }    static class ThreadA extends Thread{    DataWrap dataWrap;        public ThreadA(DataWrap dataWrap){      this.dataWrap = dataWrap;    }        @Override    public void run() {      for(int i = 0 ; i < 10; i++) {        synchronized (dataWrap) {          if (dataWrap.flag) {            try {              dataWrap.wait();            } catch (InterruptedException e) {              e.printStackTrace();            }          }                    dataWrap.data++;          System.out.println(getName() + " " + dataWrap.data);          dataWrap.flag = true;          dataWrap.notify();        }        }    }  }    static class ThreadB extends Thread{    DataWrap dataWrap;        public ThreadB(DataWrap dataWrap){      this.dataWrap = dataWrap;    }        @Override    public void run() {      for (int i = 0; i < 10; i++) {          synchronized (dataWrap) {            if (!dataWrap.flag) {              try {                dataWrap.wait();              } catch (InterruptedException e) {                e.printStackTrace();              }            }                        dataWrap.data++;            System.out.println(getName() + " " + dataWrap.data);            dataWrap.flag = false;            dataWrap.notify();          }        }        }        }  public static void main(String[] args) {    //实现两个线程轮流对数据进行加一操作        DataWrap dataWrap = new DataWrap();    new ThreadA(dataWrap).start();    new ThreadB(dataWrap).start();  }}

2、使用Condition控制线程通信

 当使用Lock对象保证同步时,则使用Condition对象来保证协调。

示例如下:

import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import com.sun.media.sound.RIFFInvalidDataException;import javafx.scene.chart.PieChart.Data;public class SyncLockThreadCommunication {  static class DataWrap {    int data;    boolean flag;        private final Lock lock = new ReentrantLock();    private final Condition condition = lock.newCondition();        public void addThreadA() {      lock.lock();      try {        if (flag) {          try {            condition.await();          } catch (InterruptedException e) {            e.printStackTrace();          }        }                data++;        System.out.println(Thread.currentThread().getName() + " " + data);        flag = true;        condition.signal();      } finally {        lock.unlock();      }    }        public void addThreadB() {      lock.lock();      try {        if (!flag) {          try {            condition.await();          } catch (InterruptedException e) {            e.printStackTrace();          }        }                data++;        System.out.println(Thread.currentThread().getName() + " " + data);        flag = false;        condition.signal();      } finally {        lock.unlock();      }    }  }    static class ThreadA extends Thread{    DataWrap dataWrap;        public ThreadA(DataWrap dataWrap) {      this.dataWrap = dataWrap;    }        @Override    public void run() {      for (int i = 0; i < 10; i++) {        dataWrap.addThreadA();      }    }  }    static class ThreadB extends Thread{    DataWrap dataWrap;        public ThreadB(DataWrap dataWrap) {      this.dataWrap = dataWrap;    }        @Override    public void run() {      for (int i = 0; i < 10; i++) {        dataWrap.addThreadB();      }    }  }    public static void main(String[] args) {    //实现两个线程轮流对数据进行加一操作        DataWrap dataWrap = new DataWrap();    new ThreadA(dataWrap).start();    new ThreadB(dataWrap).start();  }}

其中Condition对象的await(), singal(),singalAll()分别对应wait(),notify()和notifyAll()方法。

3、使用阻塞队列BlockingQueue控制线程通信

BlockingQueue是Queue接口的子接口,主要用来做线程通信使用,它具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果队列已空,则该线程被阻塞。这两个特征分别对应两个支持阻塞的方法,put(E e)和take()

示例如下:

import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class BlockingQueueThreadComminication {  static class DataWrap{    int data;  }    static class ThreadA extends Thread{    private BlockingQueue<DataWrap> blockingQueue;        public ThreadA(BlockingQueue<DataWrap> blockingQueue, String name) {      super(name);      this.blockingQueue = blockingQueue;    }        @Override    public void run() {      for (int i = 0; i < 100; i++) {        try {          DataWrap dataWrap = blockingQueue.take();                    dataWrap.data++;          System.out.println(getName() + " " + dataWrap.data);          sleep(1000);        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }  }    static class ThreadB extends Thread{    private BlockingQueue<DataWrap> blockingQueue;    private DataWrap dataWrap;        public ThreadB(BlockingQueue<DataWrap> blockingQueue, DataWrap dataWrap, String name) {      super(name);      this.blockingQueue = blockingQueue;      this.dataWrap = dataWrap;    }        @Override    public void run() {      for (int i = 0; i < 100; i++) {        try {          dataWrap.data++;          System.out.println(getName() + " " + dataWrap.data);          blockingQueue.put(dataWrap);          sleep(1000);        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }  }    public static void main(String[] args) {    ///实现两个线程轮流对数据进行加一操作        DataWrap dataWrap = new DataWrap();    BlockingQueue<DataWrap> blockingQueue = new ArrayBlockingQueue<>(1);        new ThreadA(blockingQueue, "Consumer").start();    new ThreadB(blockingQueue, dataWrap, "Producer").start();  }}

BlockingQueue共有五个实现类:

ArrayBlockingQueue 基于数组实现的BlockingQueue队列

LinkedBlockingQueue 基于链表实现的BlockingQueue队列

PriorityBlockingQueue 中元素需实现Comparable接口,其中元素的排序是按照Comparator进行的定制排序。

SynchronousQueue 同步队列,要求对该队列的存取操作必须是交替进行。

DelayQueue 集合元素必须实现Delay接口,队列中元素排序按照Delay接口方法getDelay()的返回值进行排序。