你的位置:首页 > Java教程

[Java教程]Java多线程整理(一)


目录:

1.volatile变量

2.Java并发编程学习

3.CountDownLatch用法

4.CyclicBarrier使用

5.BlockingQueue使用

6.任务执行器Executor
7.CompletionService使用
8.ConcurrentHashMap使用
9.Lock使用

 

 

一、 volatile变量

  1.volatile原理:volatile的原理实际上是告诉处理器,不要把变量缓存在寄存器或者相对于其他处理器不可见的地方,而是把变量放在主存,每次读写操作都在主存上进行操作。另外,被申明为volatile的变量也不会与其它内存中的变量进行重排序。

  2.volatile同步:volatile是同步的一个子集,只保证了变量的可见性,但是不具备原子特性。这就是说线程能够自动发现 volatile 变量的最新值。相对于同步而言,volatile的优势:a.简易性,可以像使用其他变量一样使用volatile变量;b.volatile变量不会造成线程阻塞;c.如果读操作远远大于写操作,volatile 变量还可以提供优于锁的性能优势。

  3.正确使用volatile条件:对变量的写操作不依赖于当前值;该变量没有包含在具有其他变量的不变式中;

/* * 对于第一条原则:对变量的写操作不依赖于当前值; * 虽然i++只有一条语句,实际上这条语句是分三步执行的,读入i,i加1,写入i; * 若在第三步执行过程前,其他线程对i进行了改动,此时的结果将是错的。因此即使使用了volatile进行控制,并不能保证这个操作是线程安全的。*/private volatile int i=1;... i++;/* * 这类问题的解决方案有两种: * 1.一种是采用synchronized进行同步控制,这显然违背了volatile的初衷 * 2.一种是采用CPU原语进行控制。在jdk1.5之后,java.util.concurrent.atomic包下的很多类就是采用这种方式进行控制,这样可以在保持性能的情况下,保证数据的线程安全。 */

二、Java并发编程学习

  在java 并发编程实践中对线程安全的定义如下:当多个线程访问一个类时,如果不用考虑这些线程在运行时环境下的调度和交替运行,并且不需要额外的同步及在调用方代码不必做其他的协调,这个类的行为仍然是正确的,那么这个类就是线程安全的。完全由线程安全的类构成的程序不一定就是线程安全的。

/* * 如下代码所示: * 虽然Vector是一个线程安全的类,但是对于由Vector线程安全的方法组成上面的逻辑,显然不是线程安全的。 * 当然,由线程不安全的类构成的程序,也不一定不是线程安全的。 */Vector v; if(!v.contains(o)){v.add(o);}/* * 微妙的可见性: * 当变量的读入写出被不同的线程共享时,必须使用同步。若不使用同步将有可能发生与直觉大相径庭的错误。 */public class TestVisiable {  static int x = 0, y = 0;  static int b = 0, a = 0;  public static void main(String[] args) throws InterruptedException {    Thread t1 = new Thread(new Runnable() {      @Override      public void run() {        a = 1;        x = b;      }    });    Thread t2 = new Thread(new Runnable() {      @Override      public void run() {        b = 1;        y = a;      }    });    t1.start();    t2.start();    t1.join();    t2.join();    System.out.println(x + " " + y);  }}/* * 由于没有同步,程序运行的可能结果为(1,0) (0,1),(1,1),然而,还有可能为(0,0),不可思议吧。 * 1.这是由于为了加快并发执行,JVM内部采用了重排序的机制导致。 * 2.最低限的安全性在java中,java存储模型要求java变量的获取和存储都是原子操作,但是有两种类型的变量是比较特殊的,没有申明为volitale的long和double变量。 * 原因在于: * 1.在java中long和double变量为64位,jvm允许将64位的读写操作划分为两个32位的操作。 * 2.当多个线程同时读写非volatile的long或者double类型数据时,将有可能得到数据是一个数的高32位和另一个数的低32位组成的数。 * 3.这样的结果显然是完全不对的。因此对应long或double类型的数据用于多线程共享时,必须申明加上volatile或者进行同步。 */

三、CountDownLatch用法

  在jdk API中如下描述:一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。

  CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。

/* * 下面给出一个详细的应用场景 百米赛跑: * 比赛共有10名运动员参与,10名运动在起跑线上统一等待号令员发起起跑的号令,通过终点时号令员统计该运动员的时间,当所有运动员都跑到终点时,报告每个人的成绩。 * 这里实际上就可以采用CountDownLatch来解决起跑线上设置一个CountDownLatch,当号令员没有发起起跑号令时,所有运动员都在起跑线上等待。 * 在终点设置一个CountDownLatch,起跑后号令员一直等待,当所有运动员都通过终点后,它会报告成绩。 用代码实现如下: */import java.util.HashMap;import java.util.Map;import java.util.concurrent.CountDownLatch;public class Commander {  private final CountDownLatch   startSignal;              // 起跑信号  private final CountDownLatch   endSignal;               // 终点信号  private Long           startTime;  private final Map<Integer, Long> scoreMap = new HashMap<Integer, Long>();  public void waitStart() {    try {      startSignal.await();    } catch (InterruptedException e) {      e.printStackTrace();    }  }  public synchronized void start() {    startTime = System.currentTimeMillis();    startSignal.countDown();    try {      endSignal.await();    } catch (InterruptedException e) {      e.printStackTrace();    }  }  public void reach(int id) {    endSignal.countDown();    scoreMap.put(id, System.currentTimeMillis() - startTime);// 统计时间  }  public Commander(int num){    startSignal = new CountDownLatch(1);    endSignal = new CountDownLatch(num);  }  public static void main(String[] args) {    int runnerNum = 10;    Commander c = new Commander(runnerNum);    for (int i = 0; i < runnerNum; i++) {      new Thread(new Runner(i + 1, c)).start();    }    c.start(); // 发起号令    for (Integer i : c.scoreMap.keySet()) {      System.out.println(i + "号运动员,耗时" + c.scoreMap.get(i));    }  }}class Runner implements Runnable {  Commander commander;  int    id;  public Runner(int id, Commander commander){    this.id = id;    this.commander = commander;  }  @Override  public void run() {    commander.waitStart();    try {      Thread.sleep((long) (Math.random() * 1000));    } catch (InterruptedException e) {      e.printStackTrace();    }    commander.reach(id);  }}

四、CyclicBarrier使用

  CyclicBarrier在jdk中描述:一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier。

  实际上CyclicBarrier类似于CountDownLatch也是个计数器。不同的是,当线程调用await方法后,必须所有线程都到达了,才能分别进入后面的执行过程。一旦有一个线程未到达,所有线程都会等待,有点像一部电影的名字《一个都不能少》。

/* * 考虑一个应用场景: * 老师带领学生去春游,下午回来时,需要整队,然后统一坐车回去。 * 在这里,老师和已经到达的学生,必须等待所有学生归队后才能回去。 用代码实现如下: */import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class Teacher {  private final CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {                      @Override                      public void run() {                        System.out.println("所有学生已经归队");                      }                    });  public void comeBack(int i) {    System.out.println(i + "已经归队。");    try {      barrier.await();    } catch (InterruptedException e) {      e.printStackTrace();    } catch (BrokenBarrierException e) {      e.printStackTrace();    }  }  public static void main(String[] args) {    Teacher t = new Teacher();    for (int i = 0; i < 20; i++) {      new Thread(new Student(i + 1, t)).start();    }  }}class Student implements Runnable {  private final int   id;  private final Teacher teacher;  public Student(int id, Teacher teacher){    this.id = id;    this.teacher = teacher;  }  @Override  public void run() {    try {      Thread.sleep((long) (Math.random() * 1000));      teacher.comeBack(id);      System.out.println(id + "上车");    } catch (InterruptedException e) {      e.printStackTrace();    }  }}

五、BlockingQueue 学习

  从名字上看,BlockingQueue是阻塞队列的意思。这个队列主要提供下面的功能:

  1.阻塞队列提供了可阻塞的take和put方法,另外可定时的poll和offer实际原理也是一样的。

  2.如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒继续操作。

  阻塞队列有一般又为两类:无限阻塞队列和有限阻塞队列。有限阻塞队列中,当队列满时,调用put方法将会阻塞;而无限阻塞队列中,put方法是不会阻塞的。很显然这个队列的一种最常用的场景就是:生产者-消费者 模式。

/* 它有以下具体实现类: * ArrayBlockingQueue:采用数组作为存储队列的阻塞队列,这个队列采用FIFO的方式管理数据 * LinkedBlockingQueue:采用链式结构作为存储队列,同样它也采用FIFO的方式管理数据 * PriorityBlockingQueue:采用基于优先级堆的极大优先级队列作为存储队列。 * SynchronousQueue:特殊的BlockingQueue,其中每个 put 必须等待一个 take,反之亦然。 * DelayQueue:这是一个无限阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部。 * * 通过查看源代码可以看到这个类内部是采用PriorityQueue作为存储队列 * DelayQueue的一些常用的场景 * a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。 * b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。 * c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。 * */

六、任务执行器Executor

  记得在大学的时候,有一次写程序时,需要创建很多的线程来处理各种socket请求,于是写了一个线程类,每出现一个socket请求,就创建一个线程。后来,老师指出,每次创建线程的开销比较大,可以将线程与具体的业务逻辑分离开来,然后用一个队列,保存一定量的线程,每次需要的时候就去取,不用的时候就还回去,这样可以循环使用,避免重复创建线程的开销,于是乎,对代码进行重构,写了大段的代码,发现以后需要用到多线程的地方都可要用它,后来在网上找资料才知道,那叫线程池。

  jdk1.5的升级,给我们带来一个很特殊的包java.util.concurrent,翻阅API,可以看到jdk中已经封装了线程池,简单两行便可实现。这个包中提供了Executor的接口,接口定义如下:

void execute(Runnable command);

传入一个runnable接口的实现,它便自动为我们创建线程和执行,任务的提交者再也不用为了并发执行,自己写一大段代码来创建并管理各种线程。扩展了Executor的有ExecutorService, ScheduledExecutorService,AbstractExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor.由于Runnable接口中只提供了一个不带返回值run方法,因此当任务需要返回值时,Executor就不能满足需求了,于是出现了ExecutorService,这个接口继承了Executor,对提交任务的接口进行了扩展,引入了Callable接口,该接口定义如下:

public interface Callable<V> {  V call() throws Exception;}

同时接口将任务执行过程进行管理,分为三个状态,提交,shutdown,terminate。在AbstractExecutorService中可以看到submit(Callable c)的实现,实际上它会先创建一个Future对象,然后再调用execute(Runnable command)方法,执行任务。可以很明显的知道Future肯定是继承了Runnable接口。通过Future接口,我们可以获取由call方法调用的返回值。Executor接口的两个具体实现是ThreadPoolExecutor和ScheduledThreadPoolExecutor,通过名字可以看出,ThreadPoolExecutor是通过采用线程池来执行每个提交的任务,ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,主要用于满足延迟后运行任务,或者定期执行任务的需求。虽然,我们可以通过直接调用ThreadPoolExecutor和ScheduledThreadPoolExecutor的构造函数生成Executor,但是很多情况下,没有这个必要,因为jdk为我们作了简化工作,通过Executors这个工厂类,可以只需传入一些简单的参数,便可以得到我们需要的Executor对象。

七、CompletionService学习

  若在采用Executor执行任务时,可用通过采用Future来获取单个任务执行的结果,在Future中提供了一个get方法,该方法在任务执行返回之前,将会阻塞。当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,若没有完成则阻塞,这对于对任务结果需要分别对待的时候是可行的。但是若所有task产生的结果都可以被同等看待,这时候采用前面这样的方式显然是不可行了,因为若当前的task没有完成,而后面的其它task已经完成,你也得等待,这个实效性不高。显然很多时候,统一组task产生的结果都应该是没有区别的,也就是满足上述第二种情况。这个时候咋办呢?jdk为我们提供了一个很好的接口CompletionService,这个接口的具体实现类是ExecutorCompletionService。该类中定义下面三个属性:

private final Executor executor;private final AbstractExecutorService aes;private final BlockingQueue<Future<V>> completionQueue;

executor由构造函数传入,aes只是用于生成Future对象。特别要注意是completionQueue。它维护了一个保存Future对象的BlockingQueue。当这个Future对象状态是结束的状态的时候,也就是task执行完成之后,会将它加入到这个Queue中。到底是在哪里将完成的Future加入到队列里面的呢?又是怎么知道task是什么时候结束的呢?在ExecutorCompletionService中定义了一个QueueingFuture类,该类的实现:

private class QueueingFuture extends FutureTask<Void> {    QueueingFuture(RunnableFuture<V> task) {      super(task, null);      this.task = task;    }    protected void done() { completionQueue.add(task); }    private final Future<V> task;  }

可以看到在done方法中,它会把当前的task加入到阻塞队列中。追踪done方法可以看到,该方法定义在FutureTask中,默认实现为空,从注释可以看出,当Future的状态转为isDone的时候,就会调用该方法。调用端在调用CompletionService的take方法时,实际上调用的是BlockingQueue的take方法,由前面的学习中,我们知道,当队列中有内容时,该队列会立即返回队列中的对象,当队列为空时,调用线程将会阻塞。而只要有任务完成,调用线程就会跳出阻塞,获得结果。

八、ConcurrentHashMap学习

  java中提供对HashMap是我们使用得比较多的一个类,单该类是非线程安全的,若处于多线程环境中,则需要通过synchronized关键字进行同步(通过查看Collections.synchronizedMap方法,可以知道,实际上该方法对实现也是通过synchronized关键字进行同步控制),虽然它能保证线程安全,但是,由于需要对整个map进行加锁,这样做的并发性能往往不是很理想,尤其是map中数据量比较大的时候。jdk1.5之后,java.util.concurrent包中提供了ConcurrentHashMap,一方面,该类自己保证了线程安全性,另一方面,该类也提供了一些复合操作的原子性接口。

  ConcurrentHashMap与HashMap一样,同样是哈希表,但是采用不同对锁机制--分离锁,即采用不同的锁来同步不同的数据块,以减少对锁对竞争。   在ConcurrentHashMap内部采用了一个包含16个锁对象对数组,每个锁负责同步hash Bucket的1/16,bucket中的每个对象通过它的hashCode计算得到它所在锁对象。假设hash算法的实现能够提供合理的扩展性,并且关键字能够以统一的方式访问,这会将对于锁的请求减少到原来的1/16.这种基于分离锁设计的技术实现能够使得ConcurrentHashMap支持16个并发的请求。

九、Lock使用

  在java5.0以前都是采用synchronized关键字进行同步控制,所有对象都自动含有单一的锁,JVM负责跟踪对象被加锁的次数。如果一个对象被解锁,其计数变为0。在任务(线程)第一次给对象加锁的时候,计数变为1。每当这个相同的任务(线程)在此对象上获得锁时,计数会递增。只有首先获得锁的任务(线程)才能继续获取该对象上的多个锁。每当任务离开一个synchronized方法,计数递减,当计数为0的时候,锁被完全释放,此时别的任务就可以使用此资源。由于这些锁是由JVM来控制,因此也叫隐式锁。

  在jdk5.0以后,提供了显示锁,即Lock,可以说是对隐式锁的功能的扩展,主要有两个。一个是ReentrantLock,另一个是ReentrantReadWriteLock,前者是普通重入锁,后者是可重入的读写锁。这些锁提供了两种锁竞争机制:公平竞争和非公平竞争,公平竞争的实现实际上是采用一个队列保存等待的线程,当当前线程释放锁之后,取出队头的线程唤醒,使之可以获取锁。java中Lock接口的定义:

public interface Lock {  void lock();  void lockInterruptibly() throws InterruptedException;  boolean tryLock();  boolean tryLock(long time, TimeUnit unit) throws InterruptedException;  void unlock();  Condition newCondition();}

Lock与synchronized的区别:

1.synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁。而Lock的释放必须由程序自己保证,常用的写法是把是把释放锁的代码写到try{}finally中,在finally块中释放锁。

2.如果使用 synchronized ,如果A不释放,B将一直等下去,不能被中断,如果使用Lock,如果A不释放,可以使B在等待了足够长的时间以后,中断等待,而干别的事情。

3.synchronize实际上是ReentrantLock和Condition的组合的简化版

4.相对于synchonized独占锁,ReentrantReadWriteLock通过分离读锁和写锁,提供可共享的读锁提高读并发性能。