你的位置:首页 > Java教程

[Java教程]【JAVA并发】共享资源访问


什么是共享资源

先了解下JAVA程序在运行时内存的分布,由以下部分组成

  1. :所有线程共享一个堆;存放的都是new 出来的对象;
  2. 方法区:所有线程共享一个方法区;里面存放的内容有点杂,可以认为是除堆和栈中的其它东西(如类信息,静态变量,常量,代码等);
  3. 程序计数器:也叫PC,存放下一条指令所在单元的地址的地方;
  4. JAVA栈:每个线程都有一个自己的JAVA栈;存放的一般是方法的局部变量,方法出口信息等;
  5. 本地方法栈:与JAVA栈类似,区别是使用的对象不一样,本地方法栈是给Native方法使用的,JAVA栈是给JAVA方法使用的;

注意上面标红的堆和方法区的数据,是多个线程的共享的资源,会出现多个线程访问共享资源的情况;如多个线程访问同一个对象的成员变量,多个线程访问静态全局变量等...

多个线程不正确的访问共享资源

举个例子,多个线程访问同一个对象的情况,这里被访问的对象为共享资源,代码如下:

定义一个抽象类IntGenerator,类中定义一个next方法用于返回整型值,再定义一个canceled成员属性用于判断该对象是否被取消,注意canceled是boolean类型,关于canceled的操作(赋值和返回操作)都是原子性的,且canceled变量被设置为volatile,保证线程间的可见性;

package concurrency;public abstract class IntGenerator {  private volatile boolean canceled = false;  public abstract int next();  public void cancel() {    canceled = true;  }  public boolean isCanceled() {    return canceled;  }}

View Code

再定义一个具体偶数生产者EvenGenerator,继承IntGenerator类,实现next方法(两个自增操作),注意该操作不是原子性;

package concurrency;public class EvenGenerator extends IntGenerator {  private int currentEvenValue = 0;  public int next() {    ++currentEvenValue; // Danger point here!    ++currentEvenValue;    return currentEvenValue;  }  public static void main(String[] args) {    EvenChecker.test(new EvenGenerator());  }}

定义EvenChecker类,实现Runnable接口,并编写一个test方法,产生10个线程访问同一个EvenGenerator对象,每个线程都调用next方法获取返回值,并判断是否是偶数,如果不是偶数,终止线程返回;:

package concurrency;import java.util.concurrent.*;public class EvenChecker implements Runnable {  private IntGenerator generator;  private final int id;  public EvenChecker(IntGenerator g, int ident) {    generator = g;    id = ident;  }  public void run() {    while (!generator.isCanceled()) {      int val = generator.next();      if (val % 2 != 0) {        System.out.println(val + " not even!");        generator.cancel(); // Cancels all EvenCheckers      }    }  }  public static void test(IntGenerator gp, int count) {    System.out.println("Press Control-C to exit");    ExecutorService exec = Executors.newCachedThreadPool();    for (int i = 0; i < count; i++)      exec.execute(new EvenChecker(gp, i));    exec.shutdown();  }  public static void test(IntGenerator gp) {    test(gp, 10);  }}

View Code

运行程序,发现这个程序很快就终止了,因为同一个资源在未加任何保护措施的情况下被多个线程不正确的访问了;

注意,currentEvenValue变量自增需要多个步骤,在这个过程中该线程很有可能会被挂起,此时其它线程访问该变量就会导致结果异常(出现奇数);

有兴趣的可以在next方法前面加个锁,就不会出现异常结果了;

 解决共享资源竞争

加锁,synchronized关键字

如下代码,对上面提到的next方法加锁:

  public synchronized int next() {    ++currentEvenValue; // Danger point here!    ++currentEvenValue;    return currentEvenValue;  }

注意该锁为对象锁,当对象被加锁时,该对象上的其它synchronized方法只有等前一个加锁的方法调用结束释放锁后才开始执行,简单的说,所有的synchronized方法共享同一个对象锁;

还有一种锁叫类锁,一般用在对静态方法加锁,或者synchronized(xxx.class)代码段上;

使用Lock对象

如下写法,显示的调用lock和unlock方法;

package concurrency;import java.util.concurrent.locks.*;public class MutexEvenGenerator extends IntGenerator {  private int currentEvenValue = 0;  private Lock lock = new ReentrantLock();  public int next() {    lock.lock();    try {      ++currentEvenValue;      Thread.yield(); // Cause failure faster      ++currentEvenValue;      return currentEvenValue;    } finally {      lock.unlock();    }  }  public static void main(String[] args) {    EvenChecker.test(new MutexEvenGenerator());  }}

显示的调用lock和unlock方法比内建的synchronized锁更加灵活,允许你尝试去获取锁,或尝试去获取锁一段时间,而不是一直等待获取锁;在尝试获取锁的时候,如果其它线程已经获取这个锁,则可以决定去做一些其它事情,以下代码为尝试获取锁的例子,

package concurrency;import java.util.concurrent.*;import java.util.concurrent.locks.*;public class AttemptLocking {  private ReentrantLock lock = new ReentrantLock();  public void untimed() {    boolean captured = lock.tryLock();    try {      System.out.println("tryLock(): " + captured);    } finally {      if (captured)        lock.unlock();    }  }  public void timed() {    boolean captured = false;    try {      captured = lock.tryLock(2, TimeUnit.SECONDS);    } catch (InterruptedException e) {      throw new RuntimeException(e);    }    try {      System.out.println("tryLock(2, TimeUnit.SECONDS): " + captured);    } finally {      if (captured)        lock.unlock();    }  }  public static void main(String[] args) {    final AttemptLocking al = new AttemptLocking();    al.untimed(); // True -- lock is available    al.timed(); // True -- lock is available    // Now create a separate task to grab the lock:    new Thread() {      {        setDaemon(true);      }      public void run() {        al.lock.lock();        System.out.println("acquired");      }    }.start();    //Thread.yield(); // Give the 2nd task a chance    try {      TimeUnit.MILLISECONDS.sleep(55);    } catch (InterruptedException e) {      e.printStackTrace();    }    al.untimed(); // False -- lock grabbed by task    al.timed(); // False -- lock grabbed by task  }} 

View Code

在该例子中,前面两次尝试获取锁都可以成功,因为没有其他线程占用该锁,第三第四次获取锁失败,因为中间起了个线程,该线程获取锁后一直未释放;

原子性和可见性

原子性

原子操作是不能被线程中断机制中断的操作,如简单的读取和写入除long和double之外的基本数据类型操作;

如果想让long和double读取和写入操作变成原子性,可以使用volatile关键字;

原子操作看似很安全,但也需要同步,举个例子如下:

package concurrency;import java.util.concurrent.*;public class AtomicityTest implements Runnable {  private int i = 0;  public int getValue() {    return i;  }  private synchronized void evenIncrement() {    i++;    i++;  }  public void run() {    while (true)      evenIncrement();  }  public static void main(String[] args) {    ExecutorService exec = Executors.newCachedThreadPool();    AtomicityTest at = new AtomicityTest();    exec.execute(at);    while (true) {      int val = at.getValue();      if (val % 2 != 0) {        System.out.println(val);        System.exit(0);      }    }  }}

虽然evenIncrement方法加了锁,return i操作也是原子性的,但是通过getValue方法读数据的时候还是产生了奇数,因为getValue方法没有加锁,使其可以访问处于不稳定状态的i变量;

另外,JAVA中的自增操作不是原子性的,其中涉及一个读操作和写操作;

可见性

volatile关键字可以确保被修饰的域在不同线程之间的可见性,在对volatile域上的修改会被立即写到主存中,然后所有读取操作在主存中进行,保证不同线程的可见性;

注意同步也会导致数据向主存中刷新,因此如果一个域已经由synchronized关键字加锁,则不必设置为volatile;

另外,以下两种情况volatile是无法正常工作的,需要使用synchronized:

一个域的值依赖于它之前的值,如递增操作;

一个域的值受限于其它域的值;

总之,使用volatile唯一安全的情况是类中只有一个可变的域;

举个例子,SerialNumberGenerator 类中有一个获取唯一id的方法,代码如下:

package concurrency;public class SerialNumberGenerator {  private static volatile int serialNumber = 0;  public static int nextSerialNumber() {    return serialNumber++; // Not thread-safe  }}

在多线程情况下,调用该类的nextSerialNumber方法,会发现获取的值存在相同的情况,即使serialNumber变量加了volatile来修饰,但是volatile不能对递增不是原子性操作这一事实产生影响;

原子类

在java.util.concurrent.atomic包下有AtomicInteger、AtomicBoolean等特殊的原子性变量类,它们提供了compareAndSet操作,即CAS原子操作,是机器级别上的原子操作;

所谓CAS操作是一种乐观锁技术,是指在操作数据之前先判断数据是否修改过(通过预期值与内存值比较是否相等来判断),只有当数据没被修改过的时候,才写入数据;注意CAS操作存在ABA问题,即A被修改为B又被修改为A,导致无法判断A是否变化过,解决该问题可以通过引入版本号等手段;

下面给出一个使用原子类的例子,该例子未使用synchronized加锁(悲观锁),但同样能够正确运行:

package concurrency;import java.util.concurrent.*;import java.util.concurrent.atomic.*;public class AtomicIntegerTest implements Runnable {  private AtomicInteger i = new AtomicInteger(0);  public int getValue() {    return i.get();  }  private void evenIncrement() {    i.addAndGet(2);  }  public void run() {    while (true)      evenIncrement();  }  public static void main(String[] args) {    ExecutorService exec = Executors.newCachedThreadPool();    AtomicIntegerTest ait = new AtomicIntegerTest();    exec.execute(ait);    while (true) {      int val = ait.getValue();      if (val % 2 != 0) {        System.out.println(val);        System.exit(0);      }    }  }}

上面代码之所以不出现问题,是因为i.addAndGet(2)操作是一个原子操作;

 同步控制块

可以用synchronized(syncObject){}对某个代码段进行保护,防止多个线程同时访问该块区域,被保护的这段代码称为同步控制块;

某个线程想要进入同步代码块,需先获得syncObject对象的锁;

使用同步代码块比使用整个方法进行同步控制性能要好;

还可以用Lock对象创建同步控制块(lock和unlock方法);

线程本地存储ThreadLocal

防止多线程访问共享资源产生异常结果的另外一种方式是消除资源的共享,可以使用线程本地存储ThreadLocal达到这样的效果,即为每个线程创建一个共享资源的存储,从而消除多个线程对该资源的共享访问;

可以通过get和set方法访问该对象的内容,以下代码为线程本地存储的例子,从输出结果可以看出,每个线程对value的访问相互独立,互不干扰,因为它们访问的本来就不是同一个资源;

package concurrency;import java.util.concurrent.*;import java.util.*;class Accessor implements Runnable {  private final int id;  public Accessor(int idn) {    id = idn;  }  public void run() {    while (!Thread.currentThread().isInterrupted()) {      ThreadLocalVariableHolder.increment();      System.out.println(this);      Thread.yield();    }  }  public String toString() {    return "#" + id + ": " + ThreadLocalVariableHolder.get();  }}public class ThreadLocalVariableHolder {  private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {    private Random rand = new Random(47);    protected synchronized Integer initialValue() {      return rand.nextInt(10000);    }  };  public static void increment() {    value.set(value.get() + 1);  }  public static int get() {    return value.get();  }  public static void main(String[] args) throws Exception {    ExecutorService exec = Executors.newCachedThreadPool();    for (int i = 0; i < 5; i++)      exec.execute(new Accessor(i));    TimeUnit.SECONDS.sleep(3); // Run for a while    exec.shutdownNow(); // All Accessors will quit  }} 

...#1: 4838#3: 12329#1: 4839#3: 12330#1: 4840#3: 12331#1: 4841#2: 5936#3: 12332#0: 14259#4: 4979...

 参考资料:JAVA编程思想--4