Java并发

基础 #

并发编程三大特性
    可见性(visibility)
        synchronized会刷新线程栈
    有序性(ordering)
    原子性(atomicity)

CPU #

PC寄存器(Program Counter Register, 存下一指令)
优化
    乱序读
    合并写
    乱序执行(流水线)
ALU
    访问速度
        Registers: < 1ns
        L1 cache(核内): 1ns
        WC(Writer Comblining) Buffer    # 合并写, 一般4个字节
        L2 cache(核内): 3ns
        L3 cache(CPU内): 15ns
        memory: 80ns
局部性原理
    空间
        按块读取(cache line)
            一次读64Bytes               # disruptor RingBuffer实现前后7个long,两个元素不同行,避免缓存一致性协议的通知
            Java1.8注解@Contended       # 保证不在同一行,需要JVM参数-XX:-RestrictContended
    时间
        批量读指令
内存屏障                        # 不同CPU不一样
    Intel
        sfence                      # save, 之前写操作必须完成
        lfence                      # load, 之前读操作必须完成
        mfence                      # mix(save + load)
lock指令                            # 指令执行完之前,锁内存
    lock_add

CPU缓存一致性协议 #

# 是缓存锁。无法被缓存、跨越多个缓存的数据,依然总线锁
状态
    MESI Cache一致性协议                    # Intel CPU,实现方式为主动监听
        Modified                            # 一行数据在CPU Modified, 其它CPU为Invalid
        Exclusive                           # 独享,其它CPU没有
        Shared                              # 别人也在读
        Invalid                             # 别人改过了
    MSI
    MOSI
缓存行                                      # 多数64字节
    伪共享                                  # 缓存行内无关数据也要同步

OS #

进程
    # 资源分配
线程
    # 调度执行
    切换(Context Switch)
        CPU导出原线程指令和data到cache, 再导入新线程数据
    线程数
        N(threads) = N(CPU) * U(CPU) * (1 + W/C)
            # N(CPU): CPU数
            # U(CPU): 期望CPU利用率, 0到1
            # W/C: wait时间/compute时间, (1+W/C)即运行时间/计算时间。用profiler、arthas分析
        压测决定
CPU性能压榨
    单进程
    批处理
    多进程切换
    多线程进程内切换
        I/O复用
    协程(纤程、绿色线程)用户空间切换

JVM #

内存屏障 #

LoadLoad
StoreStore
LoadStore
StoreLoad

乱序执行(指令重排序) #

为什么
    CPU快, 指令1阻塞时,指令2先执行
判断
    代码不影响单线程最终一致性
例子,多线程
        static int x = 0, y = 0;
        static int a = 0, b = 0;
        void main(){
            for (long i = 0; i < Long.MAX_VALUE; i++>) {
                x=0;
                y=0;
                a=0;
                b=0;
                CountDownLatch latch = new CountDownLatch(2);

                Thread one = new Thread(() -> {
                    public void run() {
                        a = 1;
                        x = b;
                        latch.countDown();
                    }
                });
                
                Thread other = new Thread(() -> {
                    public void run() {
                        b = 1;
                        y = a;
                        latch.countDown();
                    }
                });
                one.start();
                other.start();
                latch.await();
                if (x == 0 && y == 0) {
                    break;
                }

            }
        }
例子,类初始化指令换顺序
        class T {
            int m = 8;
        }
        T t = new T();
        汇编码
            0 new #2 <T>
                # 变量半初始化状态为0
            3 dup
            4 invokespecial #3 <T.<init>>
            7 astore_1
                # 4,7乱序执行, 先建立了关联再初始化, 变量中间状态值为0线程访问时中间状态逸出
            8 return
        class C {
            private int num = 0;
            public C() {
                new Thread(() -> System.out.println(this.num)).start();
            }
            void main() {
                new C();
                System.in.read();
            }
        }

Unsafe类 #

# 相当于指针。1.8只能根部类用, 11可以直接用
操作内存
    allocateMemory
    putXX
    freeMemory
    pageSize
生成类实例
    allocateInstance
操作类或实例
    objectFieldOffset
    getInt
    getObject
CAS
    compareAndSwapObject()          # JDK1.8
    compareAndSetObject()
    weakCompareAndSetObject()       # JDK11

修饰符与锁 #

概念 #

锁细化                              # 少代码加轻量锁
锁粗化                              # 锁太多时,如行锁变表锁

synchronized #

实现方式
    JVM没要求实现方式
    早期都是OS调度
    HotSpot
        在对象头(64位)上用2位实现,组合成锁的类型
        锁升级                      # 不比原子类慢,升完不降
            偏向锁, 第一个线程第一次访问只记线程id
            自旋锁,线程争抢时,JDK6旋10次,现在为CPU内核数一半。非公平
            重量级锁,OS调度,线程WAIT。符合执行时间长,线程多的场景
原子性、可见性
可重入                              # 同一个对象同线程可重入
加锁对象
    方法锁和对象锁锁this
    static方法锁和类锁锁class类
    继承时锁的子对象(因为是this), 调super synchronized方法也锁子对象
使用注意
    抛异常立即释放锁,但被锁数据已更新
    不能用的对象
        String常量,可能未知地方锁同一个
        Integer、Long等基本类型, 值变化会生成新对象
    synchronized的属性加final防止赋值

volatile #

# 用synchronized性能差不多,volatile一般不用
作用
    # 没有原子性,可能写同一值
    变量在线程见可见性
        依靠CPU缓存一致性协议
    禁止指令重排序                  # 用JVM的读写屏障
        
修饰引用类型,内部属性不监控

DCL(Double Check Lock)单例volatile问题
        private static volatile C c;    // 禁止了创建c指令重排序
        private C(){}
        public static C getInstance() {
            if (c == null) {
                synchronized (C.class) {
                    if (c == null) {
                        // 申请内存(半初始化状态默认0),成员变量初始化,赋值
                        // 先赋值未初始化时,线程2判断非空,返回了半初始化状态的对象
                        c = new C();    
                    }
                }
            }
            return c;
        }

CAS #

# Compare And Set/Swap, 无锁优化, 乐观锁, 自旋
# Unsafe类支持
CPU原语
    cas(V, Expected, NewValue)
        if V == E                   # 无并发值判断问题,原语上加了屏障
        V = New
        else try again or fail
Java
    AtomicInteger
        incrementAndGet()
ABA问题
    # 线程1读取标记, 线程2改过又改回来,线程1判断标记锁住了提交了业务数据
    版本号                          # Java版本号类AtomicStampedReference
LongAdder
    LongAdder每次加数字, LongAccumulator用lambda
    分段锁(CAS)。值分开放数组里, 多线程对应一个item
性能测试
    LongAdder(713) > Atomic(2166) > Synchronized(3129)

#

AQS #

# AbstractQueueSynchronizer, CLH(Craig, Landin, and Hagersten)队列锁的变种
# 实现方式: CAS,volatile, 模板方法
类图
    AbstractQueueSynchronizer
        Sync
            NonfairSync
方法
    AbstractQueueSynchronizer
        # 一个state和一个双向链表,双向链表看前一结点状态(如持有时等待)
        Node
            volatile Node prev
            volatile Node next
            volatile Thread thread
        VarHandle
            # JDK1.9,保存引用,普通属性原子操作。
            # 相比反射,直接操作二进制码
            get()
            set()
            compareAndSet()         # 原子性
            getAndAdd()             # 原子性
            class C {
                int x = 0;
                private static VarHandle handle;
                static {
                    handle = MethodHandles.lookup().findVarHandle(C.class, "x", int.class)
                    handle.compareAndSet(c, 0, 1);

                }
            }
        volatile state              # 多态实现
        acquire()
        tryAcquire()                # 模板方法
        acquireQueued()             # 获得
        addWaiter(Node.EXCLUSIVE)   # 放入队列,排他锁或共享锁, CAS设置tail(从前锁整表)
        cancelAcquire()             # status CANCELLED, tail时设置null, 非tail时unpark下一节点
    NonfairSync
        nonfairTryAcquire()

ReentrantLock #

    # 可重入锁,CAS实现, state记重入多少次
    new ReentrantLock(true)         # 公平锁
    tryLock(long, TimeUnit)
    lockInterruptibly()             # 响应interrupt()标记
    newCondition()                  # 多一个等待队列

    源码
        调NonfairSync

CountDownLatch #

    # 比join()灵活
    new CountDownLatch(4)
    countDown()
    await()

CyclicBarrier #

    # 满了一起放行, 场景如I/O批量消费
    new CyclicBarrier(4, ()->{})
    await()

Phaser #

    # 阶段批量执行过滤
    class MyPhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch(phase) {
                case 0:
                    print("arrived" + registeredParties);
                    return false;
                case 1:
                    print("eated" + registeredParties);
                    return false;
                case 2:
                    print("hugged" + registeredParties);
                    return true;
                default:
                    return true;
            }
        }
    }
    Person implements Runnable {
        private int i;
        public Person(int i) {
            this.i = i;
        }
        public void arrive() {
            phaser.arriveAndAwaitAdvance();
        }
        public void eat() {
            phaser.arriveAndAwaitAdvance();
        }
        public void hug() {
            if (i == 0 || i == 1) {
                phaser.arriveAndAwaitAdvance();
            } else {
                phaser.arriveAndDeregister();
            }
        }

        @Override
        public void run() {
            arrive();
            eat();
            hug();
        }
    }
    phaser = new MyPhaser();
    phaser.bulkRegister(5);
    for (int i = 0; i < 5; i++) {
        new Thread(new Person(i)).start()
    }

ReadWriteLock #

    # 读锁是共享锁,不能写,悲观锁
    # 写锁是排他锁,不能读写
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    Lock readLock = readWriteLock.readLock();
    Lock writeLock = readWriteLock.writeLock();
    void read(Lock lock) {
        lock.lock()
        lock.unlock()
    }
    void write(Lock lock) {
        lock.lock()
        lock.unlock()
    }
    for (int i =0; i<10;i++) {
        new Thread(()->read(readLock)).start();
    }
    for (int i =0; i<2; i++) {
        new Thread(()->write(writeLock)).start();
    }

StampedLock #

    # 读时允许写,读完判断不一致再读一遍,乐观锁

Semaphore #

    # 信号量, 限流同时运行, 用于线程间同步。可设置公平
    Semaphore s = new Semaphore(1, true)
    new Thread(() -> {
        s.acquire();
        s.release()
    })

Exchanger #

    # 线程间通信, 阻塞交换
    exchange()

LockSupport #

    # 线程间通信,非锁阻塞,指定线程唤醒
    # 线程启动后,unpark()可以在park()前调用生效, make(chan struct{}, 1)
    Thread t = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                LockSupport.park();
            }
        }
    })
    t.start();
    TimeUnit.SECONDS.sleep(1);
    LockSupport.unpark(t);

线程及线程池 #

使用注意 #

为什么不用Executors线程池
    用LinkedBlockingQueue超数量OOM
    拒绝策略
    线程命名

#

    Object
        wait()                              # 释放synchronized锁并加入等待队列唤醒后执行需要得到synchronized锁
        notify()                            # 只唤醒不释放当前synchronized锁
    Thread
        static currentThread()
        static sleep()
        static yield()                      # 让出CPU, 进Ready队列
        start()
        getState()
        join()                              # 等待结束
        setDaemon()                         # 是否后台
        setPriority(Thread.NORM_PRIORITY)   # 优先级没有用


    interface Runnable
        void run()
    interface Callable
        V call() throws Exception
    interface Future
        get()
        get(long, TimeUnit)
        cancel(boolean)
        isCanceled()
        isDone()
        interface RunnableFuture
            class FutureTask
        interface CompletableFuture         # parallel
            static CompletableFuture<U> supplyAsync()
            static CompletableFuture<Void> allOf(CompletableFuture<U>...)
            static CompletableFuture<Void> anyOf(CompletableFuture<U>...)
            T join()
            CompletableFuture<U> thenApply()
            CompletableFuture<Void> thenAccept(Consumer<T>)

    interface ThreadFactory
        Thread newThread(Runnable)
        class DefaultThreadFactory
    interface Executor
        void execute()
        interface ExecutorService
            shutdown()
            shutdownNow()
            isShutdown()
            isTerminated()
            awaitTermination(long, TimeUnit)
            Future submit(Callable<T>)
            Future submit(Runnable, T)                     # 手动设个result
            submit(Runnable)
            invokeAll(Collection<Callable<T>>)
            invokeAll(Collection<Callable<T>>, long, TimeUnit)
            invokeAny(Collection<Callable<T>>)
            invokeAny(Collection<Callable<T>>, long, TimeUnit)
            abstract AbstractExecutorService
                RunnableFuture<T> newTaskFor(Runnable, T)
                RunnableFuture<T> newTaskFor(Callable<T>)
                T doInvokeAny(Collection<Callable<T>>, boolean timed, long)
                submit()
                invokeAll(Collection<Callable<T>>)
                    # 忽略CancellationException, ExecutionException其它异常抛出并取消未完成任务
                invokeAll(Collection<Callable<T>>, long, TimeUnit)
                    # 忽略CancellationException, ExecutionException, TimeoutException其它异常抛出并取消未完成任务
                invokeAny(Collection<Callable<T>>)
                invokeAny(Collection<Callable<T>>, long, TimeUnit)
                class ThreadPoolExecutor                    # 线程池+任务队列
                    # 任务顺序: 核心线程, 任务队列起新线程拒绝策略
                    class ScheduledThreadPoolExecutor       # 用DelayedWorkQueue
                        scheduleAtFixedRate(()->{}, int initial, int period, TimeUnit)
                class ForkJoinPool
                    execute(ForkJoinTask)
            interface ScheduledExecutorService
                    [class ScheduledThreadPoolExecutor]
    interface CompletionService                             # 不阻塞全部任务已有结果入队列
        poll()
        class ExecutorCompletionService


    static class Executors
        newSingleThreadExecutor()                           # 为了用任务队列和生命周期管理
        newCachedThreadPool()                               # 超时60s, max为MAX_VALUE, 任务不堆积场景
        newFixedThreadPool()
        newScheduledThreadPool()                            # AbstractQueuedSynchronizer
        newWorkStealingPool()                               # ForkJoinPool, go的M,G,P
            # 每个线程单独队列, 尾部偷加尾部

创建线程 #

# 继承
class MyThread extendws Thread {
    @Override
    public void run(){}
}
new MyThread().start();

# 组合
class MyRun implements Runnable {
    @Override
    public void run(){}
}
new Thread(new MyRun()).start();

# 返回值
class myCall implements Callable<String> {
    @Override
    public String call(){}
}
FutureTask = ft = new FutureTask<String>(new MyCall())
new Thread(ft).start();
ft.get();

# 线程池
// execute无返回值
ExecutorService service = Executors.newCachedThreadPool()
service.execute(()->{});
// submit有返回值 
Future<String> f = service.submit(new MyCall());
service.shutdown();

线程状态 #

NEW
RUNNABLE            # 可调度
    READY
    RUNNING
WAITING             # 等待唤醒,忙等待(一直占CPU)
    o.wait()
    t.join()
    LockSupport.park()
    Lock.lock()

    o.notify()
    o.notifyAll()
    LockSupport.unpark()
    Lock.unlock()
TIMED WAITING
    Thread.sleep(time)
    o.wait(time)
    t.join(time)
    LockSupport.parkNanos()
    LockSupport.parkUntil()
BLOCKING            # 阻塞等待(不占CPU但经过OS调度)
    synchronized
TERMINATED

线程打断 #

方法 
    interrupt()                 # 设置打断标记位
    isInterrupted()             # 检查标记位
    static interrupted()        # 检查当前线程标记位,并重置
检测当前线程打断标记的方法      # 抛异常并重置
    Thread.sleep()
    o.wait();
    o.join();
    ReentrantLock
        lockInterruptibly()
不检测当前线程打断标记的方法
    synchronized                # 不是代码实现检测不了
    ReentrantLock
        lock()
强制打断
    Thread
        stop()                  # 已废弃, 立即释放所有锁
        suspend()               # 已废弃,强制暂停,所有锁不释放容易死锁
        resume()                # 已废弃,强制恢复
volatile
    判断数字不准,有同步的时间延迟, interrupt()也有延迟
    也需要代码中判断, 但interrupt()有wait()等系统方法支持

线程间通信 #

通知 #

# synchronized wait() notify(), CountDownLatch, LockSupport
volatile List c = new ArrayList();
final Object lock = new Object();
new Thread(() -> {
    synchronized(lock) {
        if (c.size() != 5) {
            lock.wait();
        }
        lock.notify();              // 唤醒t1
    }
}, "t2").start();

TimeUnit.SECONDS.sleep(1);

new Thread(() -> {
    synchronized(lock) {
        for (int i = 0; i < 10; i++) {
            c.add(new Object());
            if (c.size() == 5) {
                lock.notify();
                lock.wait();        // 让出sychronized锁
            }
        }
    }
}, "t1").start();

生产消费 #

# 优化count可以用CAS加(有ABA问题)
class MyContainer<T> {
    final private List<T> list = new LinkedList<>();
    final private int MAX = 10;
    private int count = 0;

    public synchronized void put(T t) {
        while(list.size() == MAX) {
            this.wait();            // 期间可能有add() 
        }
        list.add(t);
        count++;
        this.notifyAll();           // 应该只唤醒消费者
    }

    public synchronized T get() {
        T t = null;
        while(list.size() == 0) {
            this.wait();
        }
        t = list.removeFirst();
        count--;
        this.notifyAll();           // 应该只唤醒生产者
        return t;
    }
}

# 同步容器, ReentrantLock Condition
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

public void put(T t) {
    try {
        lock.lock();
        while(list.size() == MAX) {
            producer.await();
        }
        list.add(t);
        count++;
        consumer.signalAll();
    } finally {
        lock.unlock();
    }
}

public T get() {
    T t = null;
    try {
        lock.lock();
        while(list.size() == 0) {
            consumer.await();
        }
        t = list.removeFirst();
        count--;
        producer.signalAll();
    } finally {
        lock.unlock();
    }
    return t;
}

协程 #

quasar库                            # 需要设javaagent, 每个fiber生成栈
    fiber =  new Fiber<Void>()
    fiber.start()

并发API #

Stream #

parallelStream()                    # ForkJoinPool

ThreadLocal #

内部类
    ThreadLocalMap<ThreadLocal, Object>
        # 存在每个线程里。场景如声明式事务拿conn
        # key是弱引用指向ThreadLocal, value是强引用。
    Entry extends WeakReference<ThreadLocal<?>> {
        Object value;
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
方法
    set(T)
内存泄露问题
    ThreadLocal<M> tl = new ThreadLocal();
    tl.set(new M());
    tl = null;
        # threadLocalMap中key弱引用回收, value不回收
    tl.remove();
        # 必需remove()否则内存泄露, threadLocalMap中value强引用,tl回收了也一直存在

PipedStream #

# 效率不高
PipedInputStream
    connect(PipedOutputStream)
    read(byte[])
PipedOutputStream
    write(byte[])

JMH #

# Java Microbenchmark Harness
概念
    Warmup                              # 预热
    Mesurement                          # 总执行次数
    Timeout                             # 每次执行超时时间
    Threads                             # fork线程数
    Benchmark mode                      # 模式
    Benchmark                           # 方法名
环境变量
    TEMP或TMP                           # JHM临时文件存放
使用
    @Benchmark
    @Warmup(iterations = 2, time = 3)           # 执行2次, 每次隔3秒
    @Fork(5)                                    # 多少线程
    @BenchmarkMode(Mode.Throughput)             # 显示每秒多少次
    @Measurement(iterations = 10, time = 3)     # 共测10次, 每次隔3秒
    public void testA() {

    }

Disruptor #

介绍
    CAS, 环形数组Buffer
        数组用sequence定位修改快,也避免了头尾加锁
        直接覆盖降低GC
            覆盖前有等待策略
    单机MQ
        发布订阅模式
        观察者模式
    EventFactory
        会调工厂提前分配内存, 使用时不new而是修改值,提高效率, 降低GC
使用
    class MyEvent {}
    class MyEventFactory implements EventFactory<MyEvent> {
        @Override
        public MyEvent newInstance() {}
    }
    class MyEventHandler implements EventHandler<MyEvent> {
        @Override
        void onEvent(MyEvent, long sequence, boolean endOfBatch) {}
    }
    class MyExceptionHandler implements ExceptionHandler<MyEvent> {
        @Override
        void handleEventException()
        @Override
        void handleOnStartException()
        @Override
        void handleOnShutdownException()
    }
    disruptor = new Disruptor<>(factory, 1024, Executors,defaultThreadFactory())
    disruptor = new Disruptor<>(MyEvent::new, 1024, Executors,defaultThreadFactory())
    disruptor = new Disruptor<>(factory, 1024, Executors,defaultThreadFactory(), 
        ProducerType.SINGLE, new BlockingWaitStrategy())
        # 默认ProducerType.MULTI, SINGLE可提高性能不用加锁
    // 消费
    disruptor.handleEventsWith(handler1, handler2)
    disruptor.handleEventsWith((event,seq,end)->{})
    disruptor.handleExceptionsFor(handler1).with(excptionHandler1)

    disruptor.start()

    // 生产
    ringBuffer = disruptor.getRingBuffer()
    sequence = ringBuffer.next()
    event = ringBuffer.get(sequence)
    event.set("")
    ringBuffer.publish(sequence)
    translator = new EventTranslator<>() {
        @Override
        void translateTo(event, sequence) {
            event.set("")
        }
    }
    ringBuffer.publishEvent(translator)
    ringBuffer.publishEvent((event,seq, "") -> event.set(l), "")
等待策略
    BlockingWaitStrategy                # 阻塞直到再次唤醒
    BusySpinWaitStrategy                # 自旋等待
    SleepingWaitStrategy                # sleep等待
    LiteBlockingWaitStrategy            # 同BlockingWaitStrategy减少加锁次数                
    LiteTimeoutBlockingWaitStrategy     # 同LiteBlockingWaitStrategy加超时            
    PhasedBackoffWaitStrategy
    TimeoutBlockingWaitStrategy         # 同BlockingWaitStrategy加超时                
    YieldingWaitStrategy                # 尝试100然后Thread.yield()

源码分析 #

ThreadPoolExecutor #

new ThreadPoolExecutor()                    
    int corePoolSize                    # 核心线程数, 永远存活。可设置参与回收
    int maximumPoolSize                 # 最大线程数
    long keepAliveTime                  # 生存时间
    TimeUnit
    BlockingQueue<Runnable>             # 任务队列
    ThreadFactory                       # 线程工厂, 设线程名
    RejectedExecutionHandler            # 拒绝策略
        Abort                           # 抛异常
        Discard                         # 忽略掉
        DiscardOldest                   # 忽略掉排除最久的
        CallerRuns                      # 调用者线程执行, 再多就阻塞
AtomicInteger ctl
    # 高3位线程池状态,低29位线程数量
void execute()
    判断添加核心线程
    放入队列成功
        拒绝或添加非核心线程
    添加非核心线程失败
        拒绝
boolean addWorker(Runable, boolean)
    线程数量加1
    添加Worker
        加锁
        加线程
        启动
class Worker extends AbstractQueuedSynchronizer implements Runnable
    # 本身是AQS锁, 被多任务(线程)访问
    Tread thread

ForkJoinPool #

abstract class ForkJoinTask
    ForkJoinTask<V> fork()
    V join()
    abstract class RecursiveAction          # 无返回值
        void compute()
    abstract class RecursiveTask            # 有返回值
例子
    class MyTask extends RecursiveTask<Long> {
        int start;
        int end;
        @Override
        Long compute() {
            if (end - start <= MAX_NUM) {
                return sum
            }
            subTask1 = new MyTask(start, mid)
            subTask2 = new MyTask(mid, end)
            subTask1.fork()
            subTask2.fork()
            return subTask1.join() + subTask2.join();
        }
    }
    fjp = new ForkJoinPool()
    task = new MyTask(0, nums.length)
    fjp .execute(task)
    result = task.join()