Java并发编程实战 读书笔记
@ Shen Jianan · Sunday, Nov 29, 2015 · 10 minute read · Update at Nov 29, 2015

Java并发编程实战——一本近300页的薄书,但是却延宕了相当长一段时间。中间上公开课,转职……注意力完全不在这本书上,到现在也只看了一半。想来应该做个笔记,就先把之前看的一半笔记写上,一并算是复习了。

对象的共享

可见性

所谓可见性就是变量在被访问的时候一定是最新的值。下面的程序展示了一种不满足可见性的情况:

public class NoVisibility {
    private static int number=0;
    private static boolean ready = false;

    private static class ReaderThread extends Thread{
        public void run() {
            while (!ready)
                Thread.yield();
            System.out.println(number);
        }
    }

    public static void main(String[] args){
        new ReaderThread().start();

        ready = true;
        number = 42;
    }
}

这样一个简单的程序,可能会死循环或输出0,因为编译器、处理器等会对操作的顺序进行调整,称为“重排序”,因为在这里主线程和readerThread没有共享数据。

最低安全性与64位操作

虽然在没有同步的情况下会读取到失效值,但至少是某个线程设置的值而不是随机数,这被称为最低安全性。上面的NoVisibility就满足最低安全性,number只可能是0或者42。但是并不是所有情况都能满足最低安全性。当操作64位数的时候,比如非volatile的long和double,就不满足最低安全性,因为JVM允许将它们分为两个32位操作,而这两个32位操作如果在不同线程中执行,那么很可能只读到一半的32位内容。

加锁的含义不仅仅局限于互斥行为,还包括内存可见性。加锁机制确保了本线程的写入值在其他线程是可见的,而不是让其他线程读到了失效值。

volatile变量

加锁机制能够保证变量的可见性,但是开销却难免比较大,而volatile是稍弱的同步机制,确保将变量的更新操作通知到其他线程。

可见性问题的本质就是编译器为了优化而进行指令的重排序和寄存器缓存,当声明为volatile之后,编译器与运行时不会将该变量上的操作与其他内存操作一起重排序。volatile变量不会被缓存到寄存器上,所以读取volatile类型的变量时总会返回最新写入的值。

volatile和加锁在操作原子性上的差异

访问volatile变量不会执行加锁操作,所以不会使线程阻塞,也就不能保证原子性。而加锁操作既可以保证可见性又可以保证原子性。因此,我写了一小段自增1000次到10000的多线程代码,比较volatile和内置锁不同的表现。

volatile

volatile原子性表现

最后看到的值没有到达10000(出现频率大概在10%),说明没有做到原子写(写操作不能被影响)。

内置锁(synchronized)

加锁原子性表现

最后看到的值都到达了10000,说明写操作是原子的

使用volatile的条件

根据volatile只保证可见性而不保证原子性的特征,可以在以下的情况下替代加锁操作,毕竟加锁操作的开销很大。

  1. 写入值不依赖值的当前状态,或者确保只有单个线程更新变量的值
  2. 该变量不会与其他状态变量一起纳入不变性条件中
  3. 在访问时不需要加锁

线程封闭的方法

线程封闭是指仅在单个线程中访问数据的技巧,这样可以避免共享数据而需要的同步。

Ad-hoc线程封闭

自己专门设计一个程序来维护线程的封闭性,Ad hoc是拉丁文常用短语中的一个短语。这个短语的意思是“特设的、特定目的的(地)、即席的、临时的、将就的、专案的”。这个短语通常用来形容一些特殊的、不能用于其它方面的的,为一个特定的问题、任务而专门设定的解决方案。比如开发一个只有单个线程修改volatile的程序。

栈封闭

栈封闭将变量封闭在方法体中,只能通过局部变量才能访问到对象。

对于基本类型,由于没有引用,无论如何都不会破坏栈封闭性。对于对象,要注意不要将需要封闭的对象逸出,被外部线程访问到。这样的话,对象位于执行线程的栈中,其他线程无法访问这个栈。

ThreadLocal类

ThreadLocal是更规范的一种做法,它能够使线程中的某个值与保存值的对象关联起来。

ThreadLocal提供了get与set等访问接口或方法,这些方法为每个使用该变量的线程都存有一份独立的备份。通常ThreadLocal用于防止对可变的单实例变量或全局变量进行共享。当第一次使用get时,ThreadLocal通过initalValue获得初始值

ThreadLocal与实例变量的区别

实例变量为对象实例私有,在java虚拟机的堆中分配,如果在系统中只存在一个此对象的实例,在多线程环境下,就像静态变量那样,被某个线程修改后,其他线程对修改均可见,故线程非安全;如果每个线程执行都是在不同的对象中,那对象与对象之间的实例变量的修改将互不影响,所以线程安全。 

下例中,使用ThreadLocal和实例变量的String分别创建20个线程,每个线程报告自己所看到的编号。这里使用实例变量的线程就会产生看到其他线程变量的情况,而ThreadLocal就不会产生这样的情况。

private static class ThreadWithOutLocal extends Thread{
    private static String strHolder = new String();

    private int value=0;

    public ThreadWithOutLocal(int _i){
        value = _i;
    }

    public void run(){
        strHolder="value "+value;
        for (int i=0;i<3;i++)
           System.out.println("thread "+value+" saw "+strHolder);
    }
}

public static void main(String args[]){
    ThreadWithOutLocal[] list = new ThreadWithOutLocal[20];
    for (int i=0;i<20;i++)
        list[i] = new ThreadWithOutLocal(i);

    for (int i=0;i<20;i++)
        list[i].start();
}

这里的代码输出就可能会产生下图的情况:

静态实例变量多线程的访问混乱

这里是因为static在所有类的实例中都可以访问,但除了static,就算是私有变量也有可能会因为使用同一个对象创建多个线程而产生问题:

public class ThreadLocalTest {  
    public static void main(String[] args) {  
        Runnable accumelatora = new Accumulatort();  
        Thread threada = new Thread(accumelatora, "ThreadA");  
        Thread threadb = new Thread(accumelatora, "ThreadB");  
        threada.start();  
        threadb.start();  
    }  
}  
class Accumulatort implements Runnable {  
    // 实例变量  
    int locals = 0;  
    @SuppressWarnings("unchecked")  
    public void run() {  
        for (int i = 0; i <= 10; i++) {  
            locals += 1;  
            try {  
                Thread.sleep(1000);  
            } catch (Exception e) {  
            }  
            System.out.println(Thread.currentThread().getName() + "-->" + locals);  
        }  
    }  
}  

这里的locals变量会产生大于10的结果,这可能就不是开发者所期望的效果了。

private static class ThreadWithLocal extends Thread{
    private static ThreadLocal<String> strHolder = new ThreadLocal<String>();

    private int value=0;

    public ThreadWithLocal(int _i){
        value = _i;
    }

    public void run(){
        strHolder.set("value "+value);
        for (int i=0;i<3;i++)
            System.out.println("thread "+value+" saw "+strHolder.get());
    }
}
public static void main(String args[]){
    ThreadWithLocal[] list = new ThreadWithLocal[20];
    for (int i=0;i<20;i++)
        list[i] = new ThreadWithLocal(i);

    for (int i=0;i<20;i++)
        list[i].start();
}

ThreadLocal以线程为单位,而不是以实例为单位,所以就杜绝了产生上面两种问题的可能性:

ThreadLocal多线程访问正常

基础构建模块

同步容器类

同步容器类包括Vector和Hashtable,它们会将所有对容器状态访问的操作都串行化,所以会严重降低并发性。在迭代的时候,也会对容器进行加锁,如果不想要受到这个影响的话,可以克隆容器并且在副本上进行迭代。

并发容器

并发容器针对多线程并发访问设计,比如ConcurrentHashMap,CopyOnWriteArrayList等。同步容器类在执行每个操作的时候都会持有一个锁,而并发容器使用粒度更细的加锁机制——分段锁。所以允许更多线程同时进行操作。

同步工具类

闭锁

闭锁可以用来确保某些活动直到其他活动都完成后才继续执行。CountDownLatch是一种灵活的闭锁实现,await会等待计数到达零。

CountDownLatch是一次性对象, 只有CountDown而没有CountUp方法

如下的代码中,startGate用于协调所有的线程同时开始,endGate用于标记所有的线程同时结束。只有在startGate降到0,线程才会开始,在endGate降到0,才会结束并得到endTime。

public class TestCountDownLatch {

    public static void main(String args[]){
        TestCountDownLatch test = new TestCountDownLatch();
        test.test(100,new MyTask());
    }

    public static class MyTask implements Runnable{

        @Override
        public void run() {
            int j=0;
            for (int i=0;i<100000;++i){
                ++j;
            }
        }
    }

    public void test(int nThread, final Runnable task){
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThread);

        for (int i=0;i<nThread;++i){
            Thread th = new Thread() {
                public void run() {
                    try {
                        startGate.await();

                        try {
                            task.run();
                        }finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            th.start();
        }

        long startTime = System.nanoTime();
        startGate.countDown();
        try {
            endGate.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime = System.nanoTime();
        System.out.println("the time is "+(endTime-startTime));
    }
}

FutureTask

FutureTask也可以用作闭锁。表示的计算是通过Callable来实现的,表示一种抽象的可生成结果的Runnable,处于等待、运行中和运行完成这三种状态中的一种。(运行完成也可能是运行失败或被中断等)

Future.get的行为是阻塞的,如果任务没有完成会一直阻塞。确保了结果的传递是安全发布的。

public class TestFutureTask {

//    package the FutureTask with callable task
    private final FutureTask<Integer> future = new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            int pre=0;
            int after=1;
            int result = 1;

            for (int i=0;i<10000;++i){
                result = after+pre;
                pre = after;
                after = result;
            }

            return result;
        }
    });

    private final Thread thread = new Thread(future);

    public void test(){
        thread.start();

        try {
            System.out.println("calc result is "+future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public static void main(String args[]){
        TestFutureTask testFutureTask = new TestFutureTask();
        testFutureTask.test();
    }
}

信号量Semaphore

信号量管理着一组虚拟的许可,release是释放一个信号量,acquire是申请一个信号量。如果没有获得许可,acquire将阻塞直到有许可。

Semaphore并不受限于构造函数中的许可数量。当release的时候,总会新增一个许可。

public class TestSemaphore {
    private static final Semaphore sem = new Semaphore(5);

    public static class MyRunnable implements Runnable{
        private int i;

        public void setI(int i){
            this.i = i;
        }

        @Override
        public void run() {
            try {
                sem.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("current i: " + i + "   start time: " + System.nanoTime());

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("current i:"+i+"   release time:"+System.nanoTime());
            sem.release();

        }
    }


    public void startTest(){
        for (int i=0;i<10;i++){
            MyRunnable myRunnable = new MyRunnable();
            myRunnable.setI(i);

            Thread th = new Thread(myRunnable);

            th.start();
        }
    }

    public static void main(String[] args){
        TestSemaphore test = new TestSemaphore();
        sem.release();          //additional release can make the semaphore to be more than 5
        test.startTest();
    }
}

这里的代码在main函数中额外release了一下,则可以看到前6个线程都直接申请到了信号量,而后四个则只能等其他线程释放信号量之后才得到信号量

信号量实验结果

栅栏

栅栏类似于闭锁,能阻塞一组线程直到某个事件发生。区别在于:闭锁的准许信号可以来自外部,也可以来自其他线程,只要能让CountDownLntch的信号到0即可,设置CountDownLntch的地方不一定在await;而栅栏是所有线程之间互相等待,直到所有线程都到达某一个栅栏。闭锁用于等待事件,而栅栏用于等待其他线程。

闭锁一旦到达终止状态,就不能被重置。而栅栏在所有线程都到达后,所有线程都会被放行,而栅栏将会被重置。如果超时或被中断,会抛出BrokenBarrierException。成功地通过栅栏之后,await将会为每个线程返回唯一的到达索引号。

当一个计算的下一步需要其他并行的计算结果的状态时,经常就需要栅栏。

CyclicBarrier

CyclicBarrier允许将一个Runnable传递给构造函数,当成功通过栅栏时会在一个子任务线程中自动执行这个Runnable。CyclicBarrier可以使一定数量的参与方反复地汇集在栅栏处,这在并行迭代算法中非常有用——通常将一个问题拆分成一系列相互独立的子问题。

public class TestCyclicBarrier {
    private static final int count = 5;

    private static int[] currentValue = new int[count*2];

    private static CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() {
        @Override
        public void run() {
            //第一个到达的线程会运行这一行代码
            System.out.println("get the barrier!");
        }
    });


    public static class MyTask implements Runnable{
        int currentI;

        int currentBase;

        int nextBase;

        public MyTask(int i){
            this.currentI = i;
            currentBase = currentI;
            nextBase = currentI+count;
        }

        @Override
        public void run() {
            try {
                currentValue[currentBase] = 0;

                int multi = currentBase/count;

                for (int i = 0; i < count; ++i)
                    currentValue[currentBase] += currentValue[(1-multi) * count + i];

                int tmp = currentBase;
                currentBase = nextBase;
                nextBase = tmp;

                int index = barrier.await();
        
                System.out.println(index);

                }catch(InterruptedException e){
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
                    e.printStackTrace();
                }
            }
    }

    public static void main(String args[]){
        for (int i=0;i<count*2;++i){
            currentValue[i] = 1;
        }

        for(int i=0;i<count;++i) {
            MyTask myTask = new MyTask(i);
            Thread th = new Thread(myTask);
            th.start();
        }
    }
}

CyclicBarrier实验结果

构建高性能可伸缩结果缓存的实践

假设条件:有一个非常耗时的计算操作,希望将计算结果存入一个Map来避免重复计算

使用ConcurrentHashMap依然可能会导致重复计算:如果发现某一个key没有被计算过,但其实另一个线程正在进行耗时的计算,计算后就会把这个键put进去,那依然会进行重复计算。

优化1

使用FurureTask来进行计算,存储Map内容为ConcurrentHashMap<key,Future>,这样,当一个线程正在计算result时,HashMap中已经存有Key,即使没有计算完成也会阻塞地进行读取。

public class FutureCachedPool {
    static ConcurrentHashMap<Integer,Future<Integer>> pool = new ConcurrentHashMap<Integer, Future<Integer>>();

    public static Integer getResult(final int key){
        Future<Integer> result = pool.get(key);
        if (result==null){
            Callable<Integer> calcuTask = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return ComplexCalculation.complexCalculation(key);
                }
            };
            FutureTask<Integer> calcResult = new FutureTask<Integer>(calcuTask);
            result = calcResult;
            pool.put(key,calcResult);
            calcResult.run();
        }

        try {
            return result.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return null;
    }
}

存留问题

依然可能导致进行重复计算:判断是否存在此key和put一个key之间的空隙,依然可能导致脏读而重复计算:

构造高性能缓存_检查与put之间的间隙

执行结果中些许的重复计算:

构造高性能缓存_执行结果中些许的重复计算

优化2

使用putIfAbsent来解决可能重复put的问题,如果该方法返回null,说明之前没有已经插入的内容,则可以开始计算。

优化后的代码:

public class BetterFutureCachedPool {
    static ConcurrentHashMap<Integer,Future<Integer>> pool = new ConcurrentHashMap<Integer, Future<Integer>>();

    public static Integer getResult(final int key){
        Future<Integer> result = pool.get(key);
        if (result==null){
            Callable<Integer> calcuTask = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return ComplexCalculation.complexCalculation(key);
                }
            };
            FutureTask<Integer> calcResult = new FutureTask<Integer>(calcuTask);

//            only calculates when there is no conflict
            result = pool.putIfAbsent(key,calcResult);
            if (result==null) {
                result = calcResult;
                calcResult.run();
            }
        }

        try {
            return result.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return null;
    }

}

计算情况如下,可以发现没有如上面那样的重复计算了: https://shenjianan-blog.oss-cn-beijing.aliyuncs.com/images/notes/java并发编程实战/构造高性能缓存_没有重复计算.png title:构造高性能缓存_没有重复计算

其他问题

Future可能会导致缓存污染的问题:如果Future计算失败了或超时了,在Map中依然存有此Future,所以需要在超时或失败之后将此Future从Map中移除

About Me

2018.02至今 杭州嘉云数据 算法引擎

2017.6-2017.12 菜⻦网络-⼈工智能部-算法引擎

2016.09-2018.06 南京大学研究生

2015.07-2015.09 阿里巴巴-ICBU-实习

2012.09-2016.06 南京大学本科