Yolofyi's Guide
首页
  • 前端文章

    • JavaScript
    • HTML
    • CSS
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • Mysql

    • Mysql
  • Java

    • Java基础
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 助手
收藏
  • 分类
  • 标签
  • 归档

Yolofyi

船是自己,灯塔是自己,岸也是自己
首页
  • 前端文章

    • JavaScript
    • HTML
    • CSS
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • Mysql

    • Mysql
  • Java

    • Java基础
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 助手
收藏
  • 分类
  • 标签
  • 归档
  • Mysql

  • Java

    • Java基础
    • Java并发
      • 0.关于线程你需要搞懂这些:
      • 1. 基本概念
      • 2. 线程的启动
        • 2.1 实现 Runnable 接口
        • 2.2 继承 Thread 类
        • 2.3 实现 Callable 接口
      • 3. 线程的状态
        • 初始态:NEW
        • 运行态:RUNNABLE
        • 就绪态 READY
        • 运行态 RUNNING
        • 阻塞态 BLOCKED
        • 等待态 WAITING
        • 超时等待态 TIME_WAITING
        • 终止态
      • 4. 线程的方法
        • getName
        • isAlive
        • sleep(当前线程.sleep)
        • join(其他线程.join)
        • yield(当前线程.yield)
        • interrupt(其他线程.interrupt)
        • wait(对象.wait)
        • notify(对象.notify)
        • wait&notify 最佳实践
        • suspend resume stop destroy(废弃方法)
        • 线程的优先级
        • 守护线程
        • 未捕获异常处理器
      • 线程引入开销:上下文切换与内存同步
      • 死锁
      • synchronized
        • 一种使用方法是对代码块使用 synchronized 关键字
        • 另一种写法是将 synchronized 作为方法的修饰符
      • Lock
      • volatile
      • Atomic
      • 可重入锁 ReentrantLock
      • Condition(与 wait&notify 区别)
        • Condition 与 wait&notify 区别
        • await&signal
      • 公平锁
      • 读写锁 ReentrantReadWriteLock
      • LockSupport(锁住的是线程,synchronized 锁住的是对象)
      • synchronized 与 Lock 的区别
      • JMM Java Memory Model
        • JMM 抽象结构
        • 指令重排序
        • 内存屏障
        • happens-before(抽象概念,基于内存屏障)
      • 指令重排序
        • 数据依赖性
        • as-if-serial
      • 顺序一致性
      • volatile 原理
        • 汇编上的实现
        • 内存语义
        • 内存语义的实现(内存屏障)
      • synchronized 原理
        • monitor
        • 汇编上的实现(cmpxchg)
        • Java 对象头
        • 锁的分类
        • 偏向锁(只有一个线程进入临界区)
        • 轻量级锁(多个线程交替进入临界区)
        • 重量级锁(多个线程同时进入临界区)
        • 锁的比较
        • 锁的优化
        • 自旋锁
        • 适应性自旋锁
        • 锁消除
        • 锁粗化
      • 原子操作原理
        • CPU 如何实现原子操作
        • Java 如何实现原子操作
        • CAS 在 OpenJDK 中的实现
      • ConcurrentHashMap
      • CopyOnWriteArrayList
      • BlockingQueue
      • ThreadLocal
      • Semaphore(信号量)
      • CyclicBarrier(可循环使用的屏障/栅栏)
      • Exchanger(两个线程交换数据)
      • CountDownLatch(闭锁)
      • FutureTask(Future 实现类)
        • Future
      • CompletableFuture
        • 将批量同步操作转为异步操作(并行流/CompletableFuture)
        • 多个异步任务合并
        • 回调
        • API
        • supplyAsync 提交任务
        • thenApply 变换(等待前一个任务返回后执行,处于同一个 CompletableFuture)
        • thenAccept 消耗
        • thenRun 执行下一步操作,不关心上一步结果
        • thenCombine 结合两个 CompletionStage 的结果,进行转化后返回
        • thenCompose(合并多个 CompletableFuture,流水线执行,在调用外部接口返回 CompletableFuture 类型时更方便)
        • thenAccptBoth 结合两个 CompletionStage 的结果,进行消耗
        • runAfterBoth 在两个 CompletionStage 都运行完执行,不关心上一步结果
        • applyToEither 两个 CompletionStage,谁计算的快,我就用那个 CompletionStage 的结果进行下一步的转化操作
        • acceptEither 两个 CompletionStage,谁计算的快,我就用那个 CompletionStage 的结果进行下一步的消耗操作
        • runAfterEither 两个 CompletionStage,任何一个完成了都会执行下一步的操作,不关心上一步结果
        • exceptionally 当运行时出现了异常,可以进行补偿
        • whenComplete 当运行完成时,若有异常则改变返回值,否则返回原值
        • handle 当运行完成时,无论有无异常均可转换
        • allOf
        • anyOf
      • ForkJoin
        • 原理浅析
        • 与 MapReduce 的区别
      • 引入原因
      • Executor ExecutorService ScheduledExecutorService
        • ExecutorService
        • ScheduledExecutorService
      • ThreadPoolExecutor
        • 使用注意
        • 扩展 ThreadPoolExecutor
        • 任务时限
        • 任务关闭
      • ScheduledThreadPoolExecutor
      • Executors
        • FixedThreadPool
        • SingleThreadExecutor
        • CachedThreadPool
        • ScheduledThreadPoolExecutor
        • SingleThreadScheduledExecutor
      • CompletionService
      • AbstractQueuedSynchronizer(AQS)
        • AQS 的接口
        • AQS 使用实例(互斥锁,tryAcquire 只需一次 CAS)
        • AQS 实现
        • AQS#state getState setState
        • 同步队列
        • AQS#Node
        • 独占式同步状态
        • AQS#acquire
        • addWaiter(新 Node 添加到同步队列尾部,初始状态下 head 是一个空节点)
        • acquireQueued
        • 独占式超时获取同步状态
        • AQS#tryAcquireNanos
        • AQS#doAcquireNanos
    • Spring源码解析
    • 设计模式
    • Java 集合
    • JVM
    • Spring使用与实现总结
    • Java集合面试题及答案
  • Tomcat

  • Redis

  • 分布式

  • Linux

  • Docker

  • 后端
  • Java
yolofyi
2020-07-20
目录

Java并发

# 四. 并发框架

# 一.线程

# 0.关于线程你需要搞懂这些:

  • 线程的状态
  • 线程的几种实现方式
  • 三个线程轮流打印 ABC 十次
  • 判断线程是否销毁
  • yield 功能
  • 给定三个线程 t1,t2,t3,如何保证他们依次执行

# 1. 基本概念

# 2. 线程的启动

# 2.1 实现 Runnable 接口

  • 1.自定义一个线程,实现 Runnable 接口的 run 方法 run 方法就是要执行的内容,会在另一个分支上进行 Thread 类本身也实现了 Runnable 接口
  • 2.主方法中 new 一个自定义线程对象,然后 new 一个 Thread 类对象,其构造方法的参数是自定义线程对象
  • 3.执行 Thread 类的 start 方法,线程开始执行 自此产生了分支,一个分支会执行 run 方法,在主方法中不会等待 run 方法调用完毕返回才继续执行,而是直接继续执行,是第二个分支。这两个分支并行运行

这里运用了静态代理模式: Thread 类和自定义线程类都实现了 Runnable 接口 Thread 类是代理 Proxy,自定义线程类是被代理类 通过调用 Thread 的 start 方法,实际上调用了自定义线程类的 start 方法(当然除此之外还有其他的代码)

# 2.2 继承 Thread 类

  • 自定义一个类 MyThread,继承 Thread 类,重写 run 方法
  • 在 main 方法中 new 一个自定义类,然后直接调用 start 方法 两个方法比较而言第二个方法代码量较少 但是第一个方法比较灵活,自定义线程类还可以继承其他的类,而不限于 Thread 类

# 2.3 实现 Callable 接口

# 3. 线程的状态

# 初始态:NEW

创建一个 Thread 对象,但还未调用 start()启动线程时,线程处于初始态。

# 运行态:RUNNABLE

在 Java 中,运行态包括就绪态 和 运行态。

# 就绪态 READY

该状态下的线程已经获得执行所需的所有资源,只要 CPU 分配执行权就能运行。 所有就绪态的线程存放在就绪队列中。

# 运行态 RUNNING

获得 CPU 执行权,正在执行的线程。 由于一个 CPU 同一时刻只能执行一条线程,因此每个 CPU 每个时刻只有一条运行态的线程。

# 阻塞态 BLOCKED

阻塞态专指请求排它锁失败时进入的状态。

# 等待态 WAITING

当前线程中调用 wait、join、park 函数时,当前线程就会进入等待态。 进入等待态的线程会释放 CPU 执行权,并释放资源(如:锁),它们要等待被其他线程显式地唤醒。

# 超时等待态 TIME_WAITING

当运行中的线程调用 sleep(time)、wait、join、parkNanos、parkUntil 时,就会进入该状态; 进入该状态后释放 CPU 执行权 和 占有的资源。 与等待态的区别:无需等待被其他线程显式地唤醒,在一定时间之后它们会由系统自动唤醒。

# 终止态

线程执行结束后的状态。

# 4. 线程的方法

# getName

Thread 类的构造方法 1 Thread 类的构造方法 2

  • new 一个子类对象的同时也 new 了其父类的对象,只是如果不显式调用父类的构造方法 super(),那么会自动调用无参数的父类的构造方法。 可以在自定义类 MyThread 中(继承自 Thread 类)中写一个构造方法,显式调用父类的构造方法,其参数为一个字符串,表示创建一个以该字符串为名字的 Thread 对象。
  • 效果是创建了一个 MyThread 对象,并且其父类 Thread 对象的名字是给定的字符串。
  • 如果不显式调用父类的构造方法 super(参数),那么默认父类 Thread 是没有名字的。

# isAlive

isAlive 活着的定义是就绪、运行、阻塞状态 线程是有优先级的,优先级高的获得 Cpu 执行时间长,并不代表优先级低的就得不到执行

# sleep(当前线程.sleep)

sleep 时持有的锁不会自动释放,sleep 时可能会抛出 InterruptedException。 Thread.sleep(long millis) 一定是当前线程调用此方法,当前线程进入 TIME_WAIT 状态,但不释放对象锁,millis 后线程自动苏醒进入 READY 状态。作用:给其它线程执行机会的最佳方式。

# join(其他线程.join)

t.join()/t.join(long millis) 当前线程里调用线程 1 的 join 方法,当前线程进入 WAIT 状态,但不释放对象锁,直到线程 1 执行完毕或者 millis 时间到,当前线程进入可运行状态。 join 方法的作用是将分出来的线程合并回去,等待分出来的线程执行完毕后继续执行原有线程。类似于方法调用。(相当于调用 thead.run())

# yield(当前线程.yield)

Thread.yield(),一定是当前线程调用此方法,当前线程放弃获取的 cpu 时间片,由运行状态变会可运行状态,让 OS 再次选择线程。作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证 yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。Thread.yield()不会导致阻塞。

# interrupt(其他线程.interrupt)

  • 调用 Interrupt 方法时,线程的中断状态将被置位。这是每一个线程都具有的 boolean 标志; 中断可以理解为线程的一个标志位属性,表示一个运行中的线程是否被其他线程进行了中断操作。这里提到了其他线程,所以可以认为中断是线程之间进行通信的一种方式,简单来说就是由其他线程通过执行 interrupt 方法对该线程打个招呼,让起中断标志位为 true,从而实现中断线程执行的目的。
  • 其他线程调用了 interrupt 方法后,该线程通过检查自身是否被中断进行响应,具体就是该线程需要调用 Thread.currentThread().isInterrupted 方法进行判断是否被中断或者调用 Thread 类的静态方法 interrupted 对当前线程的中断标志位进行复位(变为 false)。需要注意的是,如果该线程已经处于终结状态,即使该线程被中断过,那么调用 isInterrupted 方法返回仍然是 false,表示没有被中断。
  • 那么是不是线程调用了 interrupt 方法对该线程进行中断,该线程就会被中断呢?答案是否定的。因为Java 虚拟机对会抛出 InterruptedException 异常的方法进行了特别处理:Java 虚拟机会将该线程的中断标志位清除,然后抛出 InterruptedException,这个时候调用 isInterrupted 方法返回的也是 false。

interrupt 一个其他线程 t 时

  • 1)如果线程 t 中调用了可以抛出 InterruptedException 的方法,那么会在 t 中抛出 InterruptedException 并清除中断标志位。
  • 2)如果 t 没有调用此类方法,那么会正常地将设置中断标志位。

如何停止线程?

  • 1)在 catch InterruptedException 异常时可以关闭当前线程;
  • 2)循环调用 isInterrupted 方法检测是否被中断,如果被中断,要么调用 interrupted 方法清除中断标志位,要么就关闭当前线程。
  • 3)无论 1)还是 2),都可以通过一个 volatile 的自定义标志位来控制循环是否继续执行

但是注意! 如果线程中有阻塞操作,在阻塞时是无法去检测中断标志位或自定义标志位的,只能使用 1)的 interrupt 方法才能中断线程,并且在线程停止前关闭引起阻塞的资源(比如 Socket)。

# wait(对象.wait)

  • 调用 obj 的 wait(), notify()方法前,必须获得 obj 锁,也就是必须写在 synchronized(obj) 代码段内。
  • obj.wait(),当前线程调用对象的 wait()方法,当前线程释放对象锁,进入等待队列。依靠 notify()/notifyAll()唤醒或者 wait(long timeout)timeout 时间到自动唤醒。
  • 调用 wait()方法的线程,如果其他线程调用该线程的 interrupt()方法,则会重新尝试获取对象锁。只有当获取到对象锁,才开始抛出相应的 InterruptedException 异常,从 wait 中返回。

# notify(对象.notify)

obj.notify()唤醒在此对象监视器上等待的单个线程,选择是任意性的。notifyAll()唤醒在此对象监视器上等待的所有线程。

# wait&notify 最佳实践

等待方(消费者)和通知方(生产者)

等待方:
synchronized(obj){
	while(条件不满足){
 	obj.wait();
}
消费;
}

通知方:
synchonized(obj){
	改变条件;
	obj.notifyAll();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
  • 1)条件谓词:
  • 将与条件队列相关联的条件谓词以及在这些条件谓词上等待的操作都写入文档。
  • 在条件等待中存在一种重要的三元关系,包括加锁、wait 方法和一个条件谓词。在条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象和条件队列对象(即调用 wait 和 notify 等方法所在的对象)必须是同一个对象。
  • 当线程从 wait 方法中被唤醒时,它在重新请求锁时不具有任何特殊的优先性,而要去其他尝试进入同步代码块的线程一起正常地在锁上进行竞争。
  • 每一次 wait 调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的 wait 时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。
  • 2)过早唤醒: 虽然在锁、条件谓词和条件队列之间的三元关系并不复杂,但 wait 方法的返回并不一定意味着线程正在等待的条件谓词已经变成真了。 当执行控制重新进入调用 wait 的代码时,它已经重新获取了与条件队列相关联的锁。现在条件谓词是不是已经变为真了?或许。在发出通知的线程调用 notifyAll 时,条件谓词可能已经变成真,但在重新获取锁时将再次变成假。在线程被唤醒到 wait 重新获取锁的这段时间里,可能有其他线程已经获取了这个锁,并修改了对象的状态。或者,条件谓词从调用 wait 起根本就没有变成真。你并不知道另一个线程为什么调用 notify 或 notifyAll,也许是因为与同一条件队列相关的另一个条件谓词变成了真。一个条件队列与多个条件谓词相关是一种很常见的情况。 基于所有这些原因,每当线程从 wait 中唤醒时,都必须再次测试条件谓词。
  • 3)notify 与 notifyAll: 由于多个线程可以基于不同的条件谓词在同一个条件队列上等待,因此如果使用 notify 而不是 notifyAll,那么将是一种危险的操作,因为单一的通知很容易导致类似于信号地址(线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词)的问题。

只有同时满足以下两个条件时,才能用单一的 notify 而不是 notifyAll:

  • 1)所有等待线程的类型都相同。只有一个条件谓词与条件队列相关,并且每个线程在从 wait 返回后将执行相同的操作。
  • 2)单进单出:在对象状态上的每次改变,最多只能唤醒一个线程来执行。

# suspend resume stop destroy(废弃方法)

  • 线程的暂停、恢复、停止对应的就是 suspend、resume 和 stop/destroy。
  • suspend 会使当前线程进入阻塞状态并不释放占有的资源,容易引起死锁;
  • stop 在结束一个线程时不会去释放占用的资源。它会直接终止 run 方法的调用,并且会抛出一个 ThreadDeath 错误。
  • destroy 只是抛出一个 NoSuchMethodError。
  • suspend 和 resume 已被 wait、notify 取代。

# 线程的优先级

判断当前线程是否正在执行 注意优先级是概率而非先后顺序(优先级高可能会执行时间长,但也不一定)

线程优先级特性:

  • 继承性 比如 A 线程启动 B 线程,则 B 线程的优先级与 A 是一样的。
  • 规则性 高优先级的线程总是大部分先执行完,但不代表高优先级线程全部先执行完。
  • 随机性 优先级较高的线程不一定每一次都先执行完。 注意,在不同的 JVM 以及 OS 上,线程规划会存在差异,有些 OS 会忽略对线程优先级的设定。

# 守护线程

  • 将线程转换为守护线程
  • 守护线程的唯一用途是为其他线程提供服务。比如计时线程,它定时发送信号给其他线程;
  • 当只剩下守护线程时,JVM 就退出了。
  • 守护线程应该永远不去访问固有资源,如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。
  • 注意!Java 虚拟机退出时 Daemon 线程中的 finally 块并不一定会被执行。

# 未捕获异常处理器

在 Runnable 的 run 方法中不能抛出异常,如果某个异常没有被捕获,则会导致线程终止。

要求异常处理器实现 Thread.UncaughtExceptionHandler 接口。 可以使用 setUncaughtExceptionHandler 方法为任何一个线程安装一个处理器, 也可以使用 Thread.setDefaultUncaughtExceptionHandler 方法为所有线程安装一个默认的处理器;

如果不安装默认的处理器,那么默认的处理器为空。如果不为独立的线程安装处理器,此时的处理器就是该线程的 ThreadGroup 对象 ThreadGroup 类实现了 Thread.UncaughtExceptionHandler 接口,它的 uncaughtException 方法做如下操作:

  • 1)如果该线程组有父线程组,那么父线程组的 uncaughtException 方法被调用。
  • 2)否则,如果 Thread.getDefaultExceptionHandler 方法返回一个非空的处理器,则调用该处理器。
  • 3)否则,如果 Throwable 是 ThreadDeath 的一个实例(ThreadDeath 对象由 stop 方法产生,而该方法已过时),什么都不做。
  • 4)否则,线程的名字以及 Throwable 的栈踪迹被输出到 System.err 上。

如果是由线程池 ThreadPoolExecutor 执行任务,只有通过 execute 提交的任务,才能将它抛出的异常交给 UncaughtExceptionHandler,而通过 submit 提交的任务,无论是抛出的未检测异常还是已检查异常,都将被认为是任务返回状态的一部分。如果一个由 submit 提交的任务由于抛出了异常而结束,那么这个异常将被 Future.get 封装在 ExecutionException 中重新抛出。

# 二.并发编程的问题

# 线程引入开销:上下文切换与内存同步

使用多线程编程时影响性能的首先是线程的上下文切换。每个线程占有一个 CPU 的时间片,然后会保存该线程的状态,切换到下一个线程执行。线程的状态的保存与加载就是上下文切换。 减少上下文切换的方法有:无锁并发编程、CAS、使用最少线程、协程。

  • 1)无锁并发:通过某种策略(比如 hash 分隔任务)使得每个线程不共享资源,避免锁的使用。
  • 2)CAS:是比锁更轻量级的线程同步方式
  • 3)避免创建不需要的线程,避免线程一直处于等待状态
  • 4)协程:单线程实现多任务调度,单线程维持多任务切换

vmstat 可以查看上下文切换次数 jstack 可以 dump 线程信息,查看一个进程中各个线程的状态

  • 内存同步:在 synchronized 和 volatile 提供的可见性保证中可能会使用一些特殊指令,即内存栅栏。内存栅栏可以刷新缓存,使缓存失效,刷新硬件的写缓冲,以及停止执行管道。
  • 内存栅栏可能同样会对性能带来间接的影响,因为它们将抑制一些编译器优化操作。在内存栅栏中,大多数操作都是不能被重排序。 不要担心非竞争同步带来的开销,这个基本的机制已经非常快了,并且 JVM 还能进行额外的优化以进一步降低或消除开销。因此,我们应该将优化重点放在那些发生锁竞争的地方。

# 死锁

死锁后会陷入循环等待中。 如何避免死锁?

  • 1)避免一个线程同时获取多个锁
  • 2)避免一个线程在锁内占用多个资源,尽量保证每个锁只占用一个资源
  • 3)尝试使用定时锁 tryLock 替代阻塞式的锁
  • 4)对于数据库锁,加锁和解锁必须在一个数据库连接中,否则会解锁失败

# 线程安全性(原子性+可见性)

  • 1、对象的状态:对象的状态是指存储在状态变量中的数据,对象的状态可能包括其他依赖对象的域。在对象的状态中包含了任何可能影响其外部可见行为的数据。

  • 2、一个对象是否是线程安全的,取决于它是否被多个线程访问。这指的是在程序中访问对象的方式,而不是对象要实现的功能。当多个线程访问某个状态变量并且其中有一个线程执行写入操作时,必须采用同步机制来协同这些线程对变量的访问。同步机制包括 synchronized、volatile 变量、显式锁、原子变量。

  • 3、有三种方式可以修复线程安全问题:

    • 1)不在线程之间共享该状态变量
    • 2)将状态变量修改为不可变的变量
    • 3)在访问状态变量时使用同步
  • 4、线程安全性的定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

  • 5、无状态变量一定是线程安全的,比如局部变量。

  • 6、读取-修改-写入操作序列,如果是后续操作是依赖于之前读取的值,那么这个序列必须是串行执行的。在并发编程中,由于不恰当的执行时序而出现不正确的结果是一种非常重要的情况,它称为竞态条件(Race Condition)。最常见的竞态条件类型就是先检查后执行的操作,通过一个可能失效的观测结果来决定下一步的操作。

  • 7、复合操作:要避免竞态条件问题,就必须在某个线程修改该变量时,通过某种方式防止其他线程使用这个变量,从而确保其他线程只能在修改操作完成之前或之后读取和修改状态,而不是在修改状态的过程中。假定有两个操作 A 和 B,如果从执行 A 的线程看,当另一个线程执行 B 时,要么将 B 全部执行完,要么完全不执行 B,那么 A 和 B 对彼此来说就是原子的。原子操作是指,对于访问同一个状态的所有操作来说,这个操作是一个以原子方式执行的操作。 为了确保线程安全性,读取-修改-写入序列必须是原子的,将其称为复合操作。复合操作包含了一组必须以原子方式执行的接口以确保线程安全性。

  • 8、在无状态的类中添加一个状态时,如果这个状态完全由线程安全的对象来管理,那么这个类仍然是线程安全的。(比如原子变量)

  • 9、如果多个状态是相关的,需要同时被修改,那么对多个状态的操作必须是串行的,需要进行同步。要保持状态的一致性,就需要在单个原子操作中更新所有相关的状态变量。

  • 10、内置锁:synchronized(object){同步块} Java 的内置锁相当于一种互斥体,这意味着最多只有一个线程能持有这种锁,当线程 A 尝试获取一个由线程 B 持有的锁时,线程 A 必须等待或阻塞,直到线程 B 释放这个锁。如果 B 永远不释放锁,那么 A 也将永远地等待下去。

  • 11、重入:当某个线程请求一个由其他线程持有的锁时,发出请求的线程就会阻塞。然而,由于内置锁是可重入的,因此如果某个线程试图获得一个已经由它自己持有的锁,那么这个请求就会成功。重入意味着获取锁的操作的粒度是线程,而不是调用。重入的一种实现方法是,为每个锁关联一个获取计数值和一个所有者线程。当计数值为 0 时,这个锁就被认为是没有被任何线程持有。当线程请求一个未被持有的锁时,JVM 将记下锁的持有者,并且将获取计数值置 1。如果一个线程再次获取这个锁,计数值将递增,而当线程退出同步代码块时,计数值会相应递减。当计数值为 0 时,这个锁将被释放。

  • 12、对于可能被多个线程同时访问的可变状态变量,在访问它时都需要持有同一个锁,在这种情况下,我们称状态变量是由这个锁保护的。 每个共享的和可变的变量都应该只由一个锁来保护,从而使维护人员知道是哪一个锁。 一种常见的加锁约定是,将所有的可变状态都封装在对象内部,并提供对象的内置锁(this)对所有访问可变状态的代码路径进行同步。在这种情况下,对象状态中的所有变量都由对象的内置锁保护起来。

  • 13、不良并发:要保证同步代码块不能过小,并且不要将本应是原子的操作拆分到多个同步代码块中。应该尽量将不影响共享状态且执行时间较长的操作从同步代码块中分离出去,从而在这些操作的执行过程中,其他线程可以访问共享状态。

  • 14、可见性:为了确保多个线程之间对内存写入操作的可见性,必须使用同步机制。

  • 15、加锁与可见性:当线程 B 执行由锁保护的同步代码块时,可以看到线程 A 之前在同一个同步代码块中的所有操作结果。如果没有同步,那么就无法实现上述保证。加锁的含义不仅仅局限于互斥行为,还包括内存可见性。为了确保所有线程都能看到共享变量的最新值,所有执行读操作或写操作的线程都必须在同一个锁上同步。

  • 16、volatile 变量:当把变量声明为 volatile 类型后,编译器与运行时都会注意到这个变量是共享的,因此不会将该变量上的操作与其他内存操作一起重排序。volatile 变量不会被缓存在寄存器或其他对处理器不可见的地方,因此在读取 volatile 类型的变量时总会返回最新写入的值。volatile 的语义不足以确保递增操作的原子性,除非你能确保只有一个线程对变量执行写操作。原子变量提供了“读-改-写”的原子操作,并且常常用做一种更好的 volatile 变量。

  • 17、加锁机制既可以确保可见性,又可以确保原子性,而 volatile 变量只能确保可见性。

  • 18、当且仅当满足以下的所有条件时,才应该使用 volatile 变量:

    • 1)对变量的写入操作不依赖变量的当前值(不存在读取-判断-写入序列),或者你能确保只有单个线程更新变量的值。
    • 2)该变量不会与其他状态变量一起纳入不可变条件中
    • 3)在访问变量时不需要加锁
  • 19、栈封闭:在栈封闭中,只能通过局部变量才能访问对象。维护线程封闭性的一种更规范的方法是使用 ThreadLocal,这个类能使线程的某个值与保存值的对象关联起来,ThreadLocal 通过了 get 和 set 等访问接口或方法,这些方法为每个使用该变量的线程都存有一份独立的副本,因此 get 总是返回由当前执行线程在调用 set 时设置的最新值。

  • 20、在并发程序中使用和共享对象时,可以使用一些使用的策略,包括:

    • 1)线程封闭:线程封闭的对象只能由一个线程拥有,对象被封闭在该线程中,并且只能由这个线程修改。
    • 2)只读共享:在没有额外同步的情况下,共享的只读对象可以由多个线程并发访问,但任何线程都不能修改它。共享的只读对象包括不可变对象和事实不可变对象(从技术上来说是可变的,但其状态在发布之后不会再改变)。
    • 3)线程安全共享。线程安全的对象在其内部实现同步,因此多个线程可以通过对象的公有接口来进行访问而不需要进一步的同步。
    • 4)保护对象。被保护的对象只能通过持有对象的锁来访问。保护对象包括封装在其他线程安全对象中的对象,以及已发布并且由某个特定锁保护的对象。
  • 21、饥饿:当线程由于无法访问它所需要的资源而不能继续执行时,就发生了饥饿(某线程永远等待)。引发饥饿的最常见资源就是 CPU 时钟周期。比如线程的优先级问题。在 Thread API 中定义的线程优先级只是作为线程调度的参考。在 Thread API 中定义了 10 个优先级,JVM 根据需要将它们映射到操作系统的调度优先级。这种映射是与特定平台相关的,因此在某个操作系统中两个不同的 Java 优先级可能被映射到同一优先级,而在另一个操作系统中则可能被映射到另一个不同的优先级。 当提高某个线程的优先级时,可能不会起到任何作用,或者也可能使得某个线程的调度优先级高于其他线程,从而导致饥饿。 通常,我们尽量不要改变线程的优先级,只要改变了线程的优先级,程序的行为就将与平台相关,并且会导致发生饥饿问题的风险。 事务 T1 封锁了数据 R,事务 T2 又请求封锁 R,于是 T2 等待。T3 也请求封锁 R,当 T1 释放了 R 上的封锁后,系统首先批准了 T3 的请求,T2 仍然等待。然后 T4 又请求封锁 R,当 T3 释放了 R 上的封锁之后,系统又批准了 T 的请求......T2 可能永远等待

  • 22、活锁 活锁是另一种形式的活跃性问题,该问题尽管不会阻塞线程,但也不能继续执行,因为线程将不断重复执行相同的操作,而且总会失败。活锁通常发生在处理事务消息的应用程序中。如果不能成功处理某个消息,那么消息处理机制将回滚整个事务,并将它重新放到队列的开头。虽然处理消息的线程并没有阻塞,但也无法继续执行下去。这种形式的活锁通常是由过度的错误恢复代码造成的,因为它错误地将不可修复的错误作为可修复的错误。 当多个相互协作的线程都对彼此进行响从而修改各自的状态,并使得任何一个线程都无法继续执行时,就发生了活锁。要解决这种活锁问题,需要在重试机制中引入随机性。在并发应用程序中,通过等待随机长度的时间和回退可以有效地避免活锁的发生。

  • 23、当在锁上发生竞争时,竞争失败的线程肯定会阻塞。JVM 在实现阻塞行为时,可以采用自旋等待(Spin-Waiting,指通过循环不断地尝试获取锁,直到成功),或者通过操作系统挂起被阻塞的线程。这两种方式的效率高低,取决于上下文切换的开销以及在成功获取锁之前需要等待的时间。如果等待时间较短,则适合采用自旋等待的方式,而如果等待时间较长,则适合采用线程挂起方式。

  • 24、有两个因素将影响在锁上发生竞争的可能性:锁的请求频率,以及每次持有该锁的时间。如果二者的乘积很小,那么大多数获取锁的操作都不会发生竞争,会因此在该锁上的竞争不会对可伸缩性造成严重影响。然而,如果在锁上的请求量很高,那么需要获取该锁的线程将被阻塞并等待。在极端情况下,即使仍有大量工作等待完成,处理器也会被闲置。 有 3 种方式可以降低锁的竞争程度:

    • 1)减少锁的持有时间: 缩小锁的范围,将与锁无关的代码移出同步代码块,尤其是开销较大的操作以及可能被阻塞的操作(IO 操作)。 当把一个同步代码块分解为多个同步代码块时,反而会对性能提升产生负面影响。在分解同步代码块时,理想的平衡点将与平台相关,但在实际情况中,仅可以将一些大量的计算或阻塞操作从同步代码块移出时,才应该考虑同步代码块的大小。 减小锁的粒度:锁分解和锁分段 锁分解是采用多个相互独立的锁来保护独立的状态变量,从而改变这些变量在之前由单个锁来保护的情况。这些技术能减小锁操作的粒度,并能实现更高的可伸缩性,然而,使用的锁越多,那么发生死锁的风险也就越高。 锁分段:比如 JDK1.7 及之前的 ConcurrentHashMap 采用的方式就是分段锁的方式。
    • 2)降低锁的请求频率
    • 3)使用带有协调机制的独占锁,这些机制允许更高的并发性比如读写锁,并发容器等

# 四.线程间通信/线程同步 工具使用

# synchronized

synchronized 锁定的是对象而非代码,所处的位置是代码块或方法

# 一种使用方法是对代码块使用 synchronized 关键字

public void fun(){
	synchronized (this){ }
}
1
2
3
  • 括号中锁定的是普通对象或 Class 对象
  • 如果是 this,表示在执行该代码块时锁定当前对象,其他线程不能调用该对象的其他锁定代码块,但可以调用其他对象的所有方法(包括锁定的代码块),也可以调用该对象的未锁定的代码块或方法。
  • 如果是 Object o1,表示执行该代码块的时候锁定该对象,其他线程不能访问该对象(该对象是空的,没有方法,自然不能调用)
  • 如果是类.class,那么锁住了该类的 Class 对象,只对静态方法生效。

# 另一种写法是将 synchronized 作为方法的修饰符

  • public synchronized void fun() {} //这个方法执行的时候锁定该当前对象
  • 每个类的对象对应一把锁,每个 synchronized 方法都必须获得调用该方法的一个对象的锁方能执行,否则所属线程阻塞,方法一旦执行,就独占该锁,直到从该方法返回时才将锁释放,此后被阻塞的线程方能获得该锁,重新进入可执行状态。
  • 如果 synchronized 修饰的是静态方法,那么锁住的是这个类的 Class 对象,没有其他线程可以调用该类的这个方法或其他的同步静态方法。
  • 实际上,synchronized(this)以及非 static 的 synchronized 方法,只能防止多个线程同时执行同一个对象的这个代码段。 synchronized 锁住的是括号里的对象,而不是代码。对于非静态的 synchronized 方法,锁的就是对象本身也就是 this。
  • 获取锁的线程释放锁只会有两种情况:
    • 1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
    • 2)线程执行发生异常,此时 JVM 会让线程自动释放锁。

# Lock

锁是可重入的(reentrant),因为线程可以重复获得已经持有的锁。锁保持一个持有计数(hold count)来跟踪对 lock 方法的嵌套调用。线程在每一次调用 lock 都要调用 unlock 来释放锁。由于这一特性,被一个锁保护的代码可以调用另一个使用相同的锁的方法。

public class TestReentrantLock {
   public static void main(String[] args) {
      Ticket ticket = new Ticket();
      new Thread(ticket, "一号窗口").start();
      new Thread(ticket, "二号窗口").start();
      new Thread(ticket, "三号窗口").start();
   }
}

class Ticket implements Runnable {
   private int tickets = 100;
   private Lock lock = new ReentrantLock();

   @Override
   public void run() {
      while (true) {
         lock.lock();
         try {
            Thread.sleep(50);
            if(tickets > 0){
               System.out.println(Thread.currentThread().getName() + "正在售票,余票为:" + (--tickets));
            }
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            lock.unlock();
         }
      }
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# volatile

(作用是为成员变量的同步访问提供了一种免锁机制,如果声明一个成员变量是 volatile 的,那么会通知编译器和虚拟机这个成员变量是可能其他线程并发更新的 对于 volatile 修饰的变量,jvm 虚拟机只是保证从主内存加载到线程工作内存的值是最新的。

Java 内存模型简要介绍(后面会详细介绍):

  • 多线程环境下,会共享同一份数据(线程公共的内存空间)。为了提高效率,JVM 会为每一个线程设置一个线程私有的内存空间(线程工作内存),并将共享数据拷贝过来。写操作实际上写的是线程私有的数据。当写操作完毕后,将线程私有的数据写回到线程公共的内存空间。
  • 如果在写回之前其他线程读取该数据,那么返回的可能是修改前的数据,视读取线程的执行效率而定。
  • jvm 运行时刻内存的分配:其中有一个内存区域是 jvm 虚拟机栈,每一个线程运行时都有一个线程栈,线程栈保存了线程运行时候变量值信息。当线程访问某一个对象时候值的时候,首先通过对象的引用找到对应在堆内存的变量的值,然后把堆内存变量的具体值 load 到线程本地内存中,建立一个变量副本,之后线程就不再和对象在堆内存变量值有任何关系,而是直接修改副本变量的值,(从线程内存中读值)
  • 在修改完之后的某一个时刻(线程退出之前),把线程变量副本的值回写到对象在堆中变量。这样在堆中的对象的值就产生变化了。

final 修饰的变量是线程安全的

内存可见性问题是,当多个线程操作共享数据时,彼此不可见。 解决这个问题有两种方法:

  • 1、加锁:加锁会保证读取的数据一定是写回之后的,内存刷新。但是效率较低
  • 2、volatile:会保证数据在读操作之前,上一次写操作必须生效,即写回。
    • 1)修改 volatile 变量时会强制将修改后的值刷新到主内存中。
    • 2)修改 volatile 变量后会导致其他线程工作内存中对应的变量值失效。因此,再读取该变量值的时候就需要重新从读取主内存中的值。相较于 synchronized 是一种较为轻量级的同步策略,但是 volatile 不具备互斥性;不能保证修改变量时的原子性。
public class TestVolatile {
   public static void main(String[] args) {
      MyThread myThread = new MyThread();
      new Thread(myThread).start();
      while(true){
         synchronized (myThread) {
            if(myThread.isFlag()){
               System.out.println("flag被设置为true");
               break;
            }
         }
      }
   }
}

class MyThread implements Runnable{
   private volatile boolean flag = false;
   public boolean isFlag() {
      return flag;
   }
   public void setFlag(boolean flag) {
      this.flag = flag;
   }
   @Override
   public void run() {
      try {
         Thread.sleep(200);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      flag = true;
      System.out.println("flag="+flag);
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# Atomic

原子变量 可以实现原子性+可见性

# 五.Lock 使用 深入

# 可重入锁 ReentrantLock

在一些内置锁无法满足需求的情况下,ReentrantLock 可以作为一种高级工具。当需要一些高级功能时才应该使用 ReentrantLock,这些功能包括:可定时的、可轮询的与可中断的锁获取操作,公平队列,绑定多个条件以及非块结构的锁。否则,还是应该优先使用 synchronized。

  • 1)可中断:lock()适用于锁获取操作不受中断影响的情况,此时可以忽略中断请求正常执行加锁操作,因为该操作仅仅记录了中断状态(通过 Thread.currentThread().interrupt()操作,只是恢复了中断状态为 true,并没有对中断进行响应)。如果要求被中断线程不能参与锁的竞争操作,则此时应该使用 lockInterruptibly 方法,一旦检测到中断请求,立即返回不再参与锁的竞争并且取消锁获取操作(即 finally 中的 cancelAcquire 操作)。
  • 2)可定时:tryLock(time)
  • 3)可轮询:tryLock()
  • 4)可公平:公平锁与非公平锁
  • 5)绑定多个条件:一个锁可以对应多个条件,而 Object 锁只能对应一个条件
  • 6)非块结构:加锁与解锁不是块结构的

# Condition(与 wait&notify 区别)

BlockingQueue 就是基于 Condition 实现的。

一个 Condition 对象和一个 Lock 关联在一起,就像一个条件队列和一个内置锁相关联一样。要创建一个 Condition,可以在相关联的 Lock 上调用 Lock.newCondition 方法。

# Condition 与 wait&notify 区别

  • 1)Condition 比内置条件等待队列提供了更丰富的功能:在每个锁上可存在 可不响应中断、可等待至某个时间点、可公平的队列操作。 wait&notify 一定响应中断并抛出遗产;Condition 可以响应中断也可以不响应中断
  • 2)与内置条件队列不同的是,对于每个 Lock,可以有任意数量的 Condition 对象。
  • await() awaitUninterruptibly() await(time) Condition 对象继承了相关的 Lock 对象的公平性,对于公平的锁,线程会按照 FIFO 顺序从 Condition.await 中释放。

# await&signal

await 被中断会抛出 InterruptedException。

Condition 区分开了不同的条件谓词,更容易满足单次通知的需求。signal 比 signalAll 更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁请求的次数。

线程进入临界区(同步块)时,发现必须要满足一定条件才能执行。要使用一个条件对象来管理那些已经获得一个锁但是不能做有用工作的线程 条件对象也称为条件变量 一个锁对象可以有多个相关的条件对象,newCondition 方法可以获得一个条件对象。习惯上给每一个条件对象命名为可以反映它所表达的条件的名字。 当发现条件不满足时,调用 Condition 对象的 await 方法 此时线程被阻塞,并放弃了锁。等待其他线程的相关操作使得条件达到满足

等待获得锁的线程和调用 await 方法的线程有本质区别。一旦一个线程调用 await 方法,它进入该条件的等待集。当锁可用时,该线程不能马上解除阻塞,相反,它处于阻塞状态,直到另一个线程调用同一个条件的 signalAll 方法为止 await 方法和 signalAll 方法是配套使用的 await 进入等待,signalAll 解除等待

signalAll 方法会重新激活因为这一条件而等待的所有线程。当这些线程从等待集中移出时,它们再次成为可运行的,线程调度器将再次激活它们。同时它们将试图重新进入该对象。一旦锁可用,它们中的某个将从 await 调用返回,获得该锁并从被阻塞的地方继续执行

线程应该再次测试该条件。由于无法确保该条件被满足,signalAll 方法仅仅是通知正在等待的线程,此时有可能满足条件,值得再次去检测该条件 对于 await 方法的调用应该用在这种形式:

while(!(ok to continue)){
	condition.await();
}
1
2
3

最重要的是需要其他某个线程调用 signalAll 方法。当一个线程调用 await 方法,它没有办法去激活自身,只能寄希望于其他线程。如果没有其他线程来激活等待的线程,那么就会一直等待,出现死锁。 如果所有其他线程都被阻塞,且最后一个线程也调用了 await 方法,那么它也被阻塞,此时程序挂起。

signalAll 方法不会立刻激活一个等待的线程,仅仅是解除等待线程的阻塞,以便这些线程可以在当前线程(调用 signalAll 方法的线程)退出时,通过竞争来实现对对象的方法 这个 await 和 signalAll 方法的组合类似于 Object 对象的 wait 和 notifyAll 方法的组合

public class ConditionBoundedBuffer<T>  {
    private static final int BUFFER_SIZE = 20;
    protected final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();

    private final Condition notEmpty = lock.newCondition();

    private final T[] items = (T[]) new Object[BUFFER_SIZE];
    private int tail;
    private int head;
    private int count;

    public void put(T t) throws InterruptedException {
        lock.lock();
        try {
            while(count == items.length){
                notFull.await();
            }
            items[tail] = t;
            if(++tail == items.length){
                tail = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try{
            while(count == 0){
                notEmpty.await();
            }
            T t = items[head];
            items[head] = null;
            if(++head == items.length){
                head = 0;
            }
            --count;
            notFull.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

# 公平锁

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
1
2
3

在公平的锁上,线程按照他们发出请求的顺序获取锁,但在非公平锁上,则允许‘插队’:当一个线程请求非公平锁时,如果在发出请求的同时该锁变成可用状态,那么这个线程会跳过队列中所有的等待线程而获得锁。 非公平的 ReentrantLock 并不提倡插队行为,但是无法防止某个线程在合适的时候进行插队。

非公平锁性能高于公平锁性能的原因: 在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。

假设线程 A 持有一个锁,并且线程 B 请求这个锁。由于锁被 A 持有,因此 B 将被挂起。当 A 释放锁时,B 将被唤醒,因此 B 会再次尝试获取这个锁。与此同时,如果线程 C 也请求这个锁,那么 C 很可能会在 B 被完全唤醒之前获得、使用以及释放这个锁。这样就是一种双赢的局面:B 获得锁的时刻并没有推迟,C 更早的获得了锁,并且吞吐量也提高了。

当持有锁的时间相对较长或者请求锁的平均时间间隔较长,应该使用公平锁。在这些情况下,插队带来的吞吐量提升(当锁处于可用状态时,线程却还处于被唤醒的过程中)可能不会出现。

非公平锁可能会引起线程饥饿,但是线程切换更少,吞吐量更大

# 读写锁 ReentrantReadWriteLock

读写锁是一种性能优化措施,在一些特定的情况下能实现更高的并发性。在实际情况中,对于在多处理器系统上被频繁读取的数据结构,读写锁能够提高性能。而在其他情况下,读写锁的性能比独占锁的性能要略差一些,这是因为它们的复杂性更高。如果要判断在某种情况下使用读写锁是否会带来性能提升,最好对程序进行分析。由于 ReadWriteLock 使用 Lock 来实现锁的读写部分,因此如果分析结果表明读写锁没有提高性能,那么可以很容易地将读写锁换为独占锁。

ReentrantReadWriteLock 如果很多线程从一个数据结构中读取数据而很少线程修改其中数据,那么允许对读的线程共享访问是合适的。

读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由 jvm 自己控制的,你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!

特性:

  • (a).重入方面其内部的写锁可以获取读锁,但是反过来读锁想要获得写锁则永远都不要想。
  • (b).写锁可以降级为读锁,顺序是:先获得写锁再获得读锁,然后释放写锁,这时候线程将保持读锁的持有。反过来读锁想要升级为写锁则不可能,为什么?参看(a)
  • (c) 读锁不能升级为写锁,目的是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。
  • (d).读锁可以被多个线程持有并且在作用时排斥任何的写锁,而写锁则是完全的互斥。这一特性最为重要,因为对于高读取频率而相对较低写入的数据结构,使用此类锁同步机制则可以提高并发量。
  • (e).不管是读锁还是写锁都支持 Interrupt,语义与 ReentrantLock 一致。
  • (f).写锁支持 Condition 并且与 ReentrantLock 语义一致,而读锁则不能使用 Condition,否则抛出 UnsupportedOperationException 异常。

public class TestReadWriteLock {
    public static void main(String[] args) {
        ReadWriteLockDemo demo = new ReadWriteLockDemo();
        for (int i = 0; i < 100; ++i) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    demo.get();
                }
            }, "Read" + i).start();
        }
        new Thread(new Runnable() {

            @Override
            public void run() {
                demo.set(222);
            }
        }, "Write").start();

    }
}

class ReadWriteLockDemo {
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    private int data = 2;

    public void get() {
        lock.readLock().lock();
        try {
            System.out.println("读操作  " + Thread.currentThread().getName() + ":" + data);
        } finally {
            lock.readLock().unlock();
        }
    }

    public void set(int data) {
        lock.writeLock().lock();
        try {
            System.out.println("写操作  " + Thread.currentThread().getName() + ":" + data);
            this.data = data;
        } finally {
            lock.writeLock().unlock();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# LockSupport(锁住的是线程,synchronized 锁住的是对象)

当需要阻塞或唤醒一个线程的时候,都会使用 LockSupport 工具类来完成相应工作。 LockSupport 定义了一组公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而 LockSupport 也成为构建同步组件的基础工具。 LockSupport 定义了一组以 park 开头的方法用来阻塞当前线程,以及 unpark(thread)方法来唤醒一个被阻塞的线程。 park 等方法还可以传入阻塞对象,有阻塞对象的 park 方法在 dump 线程时可以给开发人员更多的现场信息。

park 对于中断只会设置中断标志位,不会抛出 InterruptedException。 LockSupport 是可不重入的,如果一个线程连续 2 次调用 LockSupport .park(),那么该线程一定会一直阻塞下去 unpark 函数可以先于 park 调用。比如线程 B 调用 unpark 函数,给线程 A 发了一个“许可”,那么当线程 A 调用 park 时,它发现已经有“许可”了,那么它会马上再继续运行。

LockSupport.park()和 unpark(),与 object.wait()和 notify()的区别? 主要的区别应该说是它们面向的对象不同。阻塞和唤醒是对于线程来说的,LockSupport 的 park/unpark 更符合这个语义,以“线程”作为方法的参数, 语义更清晰,使用起来也更方便。而 wait/notify 的实现使得“阻塞/唤醒对线程本身来说是被动的,要准确的控制哪个线程、什么时候阻塞/唤醒很困难, 要不随机唤醒一个线程(notify)要不唤醒所有的(notifyAll)。 park/unpark 模型真正解耦了线程之间的同步。线程之间不再须要一个 Object 或者其他变量来存储状态。不再须要关心对方的状态。

# synchronized 与 Lock 的区别

  • 1)层次:前者是 JVM 实现,后者是 JDK 实现
  • 2)功能:前者仅能实现互斥与重入,后者可以实现 可中断、可轮询、可定时、可公平、绑定多个条件、非块结构 synchronized 在阻塞时不会响应中断,Lock 会响应中断,并抛出 InterruptedException 异常。
  • 3)异常:前者线程中抛出异常时 JVM 会自动释放锁,后者必须手工释放
  • 4)性能:synchronized 性能已经大幅优化,如果 synchronized 能够满足需求,则尽量使用 synchronized

# 六.原子操作类使用

  • 1、近年来,在并发算法领域的大多数研究都侧重于非阻塞算法,这种算法用底层的原子机器指令(compareAndSwap)代替锁来确保数据在并发访问中的一致性。非阻塞算法被广泛地用于在操作系统和 JVM 中实现进程/线程调度机制、垃圾回收机制以及锁和其他并发数据结构。 非阻塞算法可以使多个线程在竞争相同的数据时不会发生阻塞,因此它能在粒度更细的层次上进行协调,并且极大地减少调度开销。而且,在非阻塞算法中不存在死锁和其他活跃性问题。在基于锁的算法中,如果一个线程在休眠或自旋的同时持有一个锁,那么其他线程都无法执行下去,而非阻塞算法不会受到单个线程失败的影响。非阻塞算法常见应用是原子变量类。 即使原子变量没有用于非阻塞算法的开发,它们也可以用作一个更好的 volatile 类型变量。原子变量提供了与 volatile 类型变量相同的内存语义,此外还支持原子的更新操作,从而使它们更加适用于实现计数器、序列发生器和统计数据收集等,同时还能比基于锁的方法提供更高的可伸缩性。
  • 2、锁的缺点:
    • 1)在挂起和恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断。
    • 2)volatile 变量同样存在一些局限:虽然它们提供了相似的可见性保证,但不能用于构建原子的操作。
    • 3)当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,那么所有需要这个锁的线程都无法执行下去。
    • 4)总之,锁定方式对于细粒度的操作(比如递增计数器)来说仍然是一种高开销的机制。在管理线程之间的竞争应该有一种粒度更细的技术,比如 CAS。
  • 3、独占锁是一种悲观技术。对于细粒度的操作,还有另外一个更高效的办法,也是一种乐观的办法,通过这种方法可以在不发生干扰的情况下完成更新操作。这种方法需要借助冲突检查机制来判断在更新过程中是否存在来自其他线程的干扰,如果存在,这个操作将失败,并且可以重试。在针对多处理器操作而设计的处理器中提供了一些特殊的指令,用于管理对共享数据的并发访问。在早期的处理器中支持原子的测试并设置(Test-and-Set),获取并递增(Fetch-and-increment)以及交换(swap)指令。现在几乎所有的现代处理器都包含了某种形式的原子读-改-写指令,比如比较并交换(compare-and-swap)等。
  • 4、CAS 包含了三个操作数——需要读写的内存位置 V,进行比较的值 A 和拟写入的新值 B。当且仅当 V 的值等于 A 时,CAS 才会以原子方式用新值 B 来更新 V 的值,否则不会执行任何操作。无论位置 V 的值是否等于 A,都将返回 V 原有的值。 CAS 的含义是:我认为 V 的值应该为 A,如果是,那么将 V 的值更新为 B,否则不修改并告诉 V 的值实际为多少。 上面这段代码模拟了 CAS 操作(但实际上不是基于 synchronized 实现的原子操作,而是由操作系统支持的)。 当多个线程尝试使用 CAS 同时更新一个变量时,只有其中一个线程能更新变量的值,而其他线程都将失败。然而,失败的线程并不会被挂起,而是被告知在这次竞争中失败,并可以再次尝试。由于一个线程在竞争 CAS 时失败不会被阻塞,因此它可以决定是否重新尝试,或者执行一些恢复操作,也或者不执行任何操作。这种灵活性就大大减少了与锁相关的活跃性风险。
  • 5、基于 CAS 实现的非阻塞计数器
  • 6、初看起来,基于 CAS 的计数器似乎比基于锁的计数器在性能上更差一些,因为它需要执行更多的操作和更复杂的控制流,并且还依赖看似复杂的 CAS 操作。但实际上,当竞争程序不高时,基于 CAS 的计数器在性能上远远超过了基于锁的计数器,而在没有竞争时甚至更高。如果要快速获取无竞争的锁,那么至少需要一次 CAS 操作再加上与其他锁相关操作,因此基于锁的计数器即使在更好的情况下也会比基于 CAS 的计数器在一般情况下能执行更多的操作。 CAS 的主要缺点是,它将使调用者处理竞争问题,而在锁中能自动处理竞争问题。
  • 7、原子变量比锁的粒度更细,量级更轻,并且对于在多处理器系统上实现高性能的并发代码来说是非常关键的。原子变量将发生竞争的范围缩小到单个变量上,这是你获得的粒度最细的情况。更新原子变量的快速路径不会比获取锁的路径慢,并且通常会更快,而它的慢速路径肯定比锁的慢速路径快,因为它不需要挂起或重新调度线程。 原子变量类相当于一种泛化的 volatile 变量,能够支持原子的和有条件的读-改-写操作。 共用 12 个原子变量类,可分为 4 组:标量类、更新器类、数组类以及复合变量类。原子数组类中的元素可以实现原子更新。 原子数组类中的元素可以实现原子更新。原子数组类为数组的元素提供了 volatile 类型的访问语义,这是普通数组锁不具备的特性——volatile 类型的数组仅在数组引用上具有 volatile 语义,而在其元素上则没有。
  • 8、ABA 问题 有时候还需要知道“自从上次看到 V 的值为 A 以来,这个值是否发生了变化”,在某些算法中,如果 V 的值首先由 A 变为 B,再由 B 变为 A,那么仍然被认为是发生了变化,并需要重新执行算法中的某些步骤。 有一个相对简单的解决方案:不是更新某个引用的值,而是更新两个值,包括一个引用和一个版本号。即使这个值由 A 变为 B,然后又变为 A,版本号也将是不同的。AtomicStampedReference 支持在两个变量上执行原子的条件更新。 AtomicStampedReference 将更新一个“对象-引用”二元组,通过在引用上加上:“版本号”,从而避免 ABA 问题。类似地,AtomicMarkableRefernce 将更新一个“对象引用-布尔值”二元组,在某些算法中将通过这种二元组使节点保存在链表中同时又将其标记为“已删除的节点”。CAS 存在 ABA,循环时间长开销大,以及只能保证一个共享变量的原子操作(变量合并,AtomicReference)三个问题。

# 七.Java 内存模型 线程同步工具原理

# JMM Java Memory Model

# JMM 抽象结构

在 Java 中,堆内存在线程之间共享,线程之间的通信由 Java 内存模型 JMM 控制。线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存(并不真实存在),本地内存中存储了线程读写共享变量的副本。

如果线程 A 与线程 B 之间要通信的话,必须要经历下面 2 个步骤:

  • 1)线程 A 把本地内存 A 中更新过的共享变量刷新到主内存中去。
  • 2)线程 B 到主内存中去读取线程 A 之前已更新过的共享变量。

# 指令重排序

  • 在执行程序时,为了提高性能,编译器和 CPU 常常会对指令进行重排序,分为以下 3 种类型:
  • 1、编译优化重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句执行顺序。
  • 2、指令级并行的重排序。CPU 采用了指令级并行技术将多条指令重叠执行。
  • 3、内存系统的重排序。由于 CPU 使用 cache 和读/写缓冲区,因此加载和存储操作可能在乱序执行。
  • 1 属于编译器重排序,2 和 3 属于处理器重排序。这些重排序可能会导致多线程程序出现内存可见性问题。
  • 对于编译器,JMM 的编译器重排序规则会禁止特定类型的编译器重排序(不是所有的编译器重排序都要禁止)。
  • 对于处理器重排序,JMM 的处理器重排序规则会要求 Java 编译器在生成指令序列时,插入特定类型的内存屏障(Memory Barriers,Intel 称之为 Memory Fence)指令,通过内存屏障指令来禁止特定类型的处理器重排序。

# 内存屏障

JMM 把内存屏障分为四类:

  • LoadLoad 屏障:对于这样的语句 Load1; LoadLoad; Load2,在 Load2 及后续读取操作要读取的数据被访问前,保证 Load1 要读取的数据被读取完毕。
  • StoreStore 屏障:对于这样的语句 Store1; StoreStore; Store2,在 Store2 及后续写入操作执行前,保证 Store1 的写入操作对其它处理器可见。
  • LoadStore 屏障:对于这样的语句 Load1; LoadStore; Store2,在 Store2 及后续写入操作被刷出前,保证 Load1 要读取的数据被读取完毕。
  • StoreLoad 屏障:对于这样的语句 Store1; StoreLoad; Load2,在 Load2 及后续所有读取操作执行前,保证 Store1 的写入对所有处理器可见。它的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能。

# happens-before(抽象概念,基于内存屏障)

JDK1.5 后,Java 采用 JSR133 内存模型,通过 happens-before 概念来阐述操作之间的内存可见性。在 JMM 中,如果一个操作执行的结果要对另一个操作可见,那么这两个操作之间必须要有 happens-before 关系。

定义:

  • 1)如果一个操作 happens-before 另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
  • 2)两个操作之间存在 happens-before 关系,并不意味着 Java 平台的具体实现必须要按照 happens-before 关系指定的顺序来执行。如果重排序之后的执行结果,与按 happens-before 关系来执行的结果一致,那么这种重排序并不非法(也就是说,JMM 允许这种重排序)。
  • 上面的 1)是 JMM 对程序员的承诺。从程序员的角度来说,可以这样理解 happens-before 关系:如果 A happens-before B,那么 Java 内存模型将向程序员保证——A 操作的结果将对 B 可见,且 A 的执行顺序排在 B 之前。注意,这只是 Java 内存模型向程序员做出的保证!
  • 上面的 2)是 JMM 对编译器和处理器重排序的约束原则。正如前面所言,JMM 其实是在遵循一个基本原则:只要不改变程序的执行结果(指的是单线程程序和正确同步的多线程程序)编译器和处理器怎么优化都行。JMM 这么做的原因是:程序员对于这两个操作是否真的被重排序并不关心,程序员关心的是程序执行时的语义不能被改变(即执行结果不能被改变)。因此,happens-before 关系本质上和 as-if-serial 语义是一回事。

与程序员密切相关的 happens-before 规则如下。

  • 1)程序顺序规则:一个线程中的每个操作,happens-before 于该线程中的任意后续操作。(单线程顺序执行)
  • 2)监视器锁规则:对一个锁的解锁,happens-before 于随后对这个锁的加锁。(先解锁后加锁)比如:
    • lock.unlock();
    • lock.lock();
    • 那么不会重排序,因为重排序后肯定会发生死锁
  • 3)volatile 变量规则:对一个 volatile 域的写,happens-before 于任意后续对这个 volatile 域的读。(先写后读)
  • 4)传递性:如果 A happens-before B,且 B happens-before C,那么 A happens-before C。
  • 5)start 规则:如果线程 A 执行操作 ThreadB.start(),那么 A 线程的 ThreadB.start() happens-before 于线程 B 的任意操作。
  • 6)join 规则:如果线程 A 执行操作 ThreadB.join()并成功返回,那么线程 B 中的任意操作 happens-before 于线程 A 从 ThreadB.join()操作成功返回。

happens-before 与 JMM 的关系如下:

# 指令重排序

重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。

# 数据依赖性

  • 对于单个 CPU 和单个线程中所执行的操作而言,如果两个操作都访问了同一个变量,且两个操作中有写操作,那么这两个操作就具有数据依赖性。
  • (RW,WW,WR)这三种操作只要重排序对操作的执行顺序,程序的执行结果就会被改变,因此,编译器和处理器在进行重排序的时候会遵守数据依赖性,不会改变存在数据依赖关系的两个操作的执行顺序。

# as-if-serial

  • as-if-serial:无论如何重排序,(单线程)程序的执行结果不能被改变。 编译器,runtime,CPU 都必须遵守 as-if-serial 语义,因此,编译器和 CPU 不会对存在数据依赖关系的操作进行重排序。
  • 在单线程中,对存在控制依赖性的操作进行重排序,不会改变执行结果,而在多线程中则可能会改变结果。

# 顺序一致性

程序未正确同步的时候,就可能存在数据竞争。 数据竞争:

  • 1)在一个线程中写一个变量
  • 2)在另一个线程中读同一个变量
  • 3)而且写和读没有通过同步来排序。

JMM 对正确同步的多线程程序的内存一致性做了如下保证: 如果程序是正确同步的,程序的执行将具有顺序一致性——即程序的执行结果与该程序的顺序一致性内存模型的执行结果相同。 顺序一致性模型有两大特性:

  • 1)一个线程中的所有操作必须按照程序的顺序来执行
  • 2)不管程序是否同步,所有线程都只能看到单一的操作执行顺序。每个操作都必须原子执行且立即对所有线程可见。

JMM 中,临界区内的代码可以重排序。

而对于未正确同步的多线程程序,JMM 只提供最小的安全性:线程执行时所读取到的值,要么是之前某个线程所写入的值,要么是默认值。

# volatile 原理

# 汇编上的实现

volatile 修饰的共享变量在转换为汇编语言后,会出现 Lock 前缀指令,该指令在多核处理器下引发了两件事:

  • 1、将当前处理器缓存行(CPU cache 中可以分配的最小存储单位)的数据写回到系统内存。
  • 2、这个写回内存的操作使得其他 CPU 里缓存了该内存地址的数据无效。 (当前 CPU 该变量的缓存回写;其他 CPU 该变量的缓存失效)

# 内存语义

volatile 写的内存语义:

  • 当写一个 volatile 变量时,JMM 会把该线程对应的本地内存中的共享变量值刷新到主内存 volatile 读的内存语义:
  • 当读一个 volatile 变量时,JMM 会把线程对应的本地内存置为无效,线程接下来将从主内存读取共享变量

一个 volatile 变量的单个读写操作,与一个普通变量的读写操作使用同一个锁来同步,它们的执行效果相同。锁的 happens-before 规则保证释放锁和获取锁的两个线程之间的内存可见性,这也意味着对一个 volatile 变量的读操作,总是能看到任意线程对该变量最后的写入。

对于 volatile 变量本身的单个读写操作具有原子性,但是与锁不同的是,多个对于 volatile 变量的复合操作不具有原子性。而锁的语义保证了临界区代码的执行具有原子性。

JAVA1.5 后,JSR-133 增强了 volatile 的内存语义,严格限制编译器和 CPU 对于 volatile 变量与普通变量的重排序,从而确保 volatile 变量的写-读操作可以实现线程之间的通信,提供了一种比锁更轻量级的线程通信机制。从内存语义的角度而言,volatile 的写-读与锁的释放-获取有相同的内存效果:写操作=锁的释放;读操作=锁的获取。

A 线程写一个 volatile 变量 x 后,B 线程读取 x 以及其他共享变量。

    1. 当 A 线程对 x 进行写操作时,JMM 会把该线程 A 对应的 cache 中的共享变量值刷新到主存中.(实质上是线程 A 向接下来要读变量 x 的线程发出了其对共享变量修改的消息)
  • 2.当 B 线程对 x 进行读取时,JMM 会把该线程对应的 cache 值设置为无效,而从主存中读取 x。(实质上是线程 B 接收了某个线程发出的对共享变量修改的消息)

两个步骤综合起来看,在线程 B 读取一个 volatile 变量 x 后,线程 A 本地 cache 中在写这个变量 x 之前所有其他可见的共享变量的值都立即变得对 B 可见。线程 A 写 volatile 变量 x,B 读 x 的过程实质上是线程 A 通过主存向 B 发送消息。

需要注意的是,由于 volatile 仅仅保证对单个 volatile 变量的读写操作具有原子性,而锁的互斥则可以确保整个临界区代码执行的原子性。

# 内存语义的实现(内存屏障)

  • 在每个 volatile 写操作的前面插入一个 StoreStore 屏障
  • 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障
  • 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障
  • 在每个 volatile 读操作的后面插入一个 LoadStore 屏障

StoreStore 屏障;(禁止上面的普通写和下面的 volatile 写重排序,保证上面所有的普通写在 volatile 写之前刷新到主内存) volatile 写; StoreLoad 屏障;(禁止上面的 volatile 写和下面的 volatile 读/写重排序)

volatile 读; LoadLoad 屏障; LoadStore 屏障;

从汇编入手分析 volatile 多线程问题

  • 1、普通方式 int i,执行 i++: 图片1.jpg 普通方式没有任何与锁有关的指令;其他方式都出现了与锁相关的汇编指令 lock。 解释指令:其中 edi 为 32 位寄存器。如果是 long 则为 64 位的 rdi 寄存器。
  • 2、volatile 方式 volatile int i,执行 i++: 图片2.jpg 指令“lock; addl $0,0(%%esp)”表示加锁,把 0 加到栈顶的内存单元,该指令操作本身无意义,但这些指令起到内存屏障的作用,让前面的指令执行完成。具有 XMM2 特征的 CPU 已有内存屏障指令,就直接使用该指令。 volatile 字节码为: 图片3.jpg

# synchronized 原理

# monitor

代码块同步是使用 monitorenter 和 monitorexit 指令实现。monitorenter 和 monitorexit 指令是在编译后插入到同步代码块开始和结束的的位置。任何一个对象都有一个 monitor 与之关联,当一个 monitor 被某个线程持有之后,该对象将处于锁定状态。线程执行到 monitorenter 指令时,会尝试获取该对象对应的 monitor 所有权,也即获得对象的锁。

monitorenter :每个对象有一个监视器锁(monitor)。当 monitor 被占用时就会处于锁定状态,线程执行 monitorenter 指令时尝试获取 monitor 的所有权,过程如下:

  • 1、如果 monitor 的进入数为 0,则该线程进入 monitor,然后将进入数设置为 1,该线程即为 monitor 的所有者。
  • 2、如果线程已经占有该 monitor,只是重新进入,则进入 monitor 的进入数加 1.
  • 3、如果其他线程已经占用了 monitor,则该线程进入阻塞状态,直到 monitor 的进入数为 0,再重新尝试获取 monitor 的所有权。

monitorexit:执行 monitorexit 的线程必须是对象所对应的 monitor 的所有者。 指令执行时,monitor 的进入数减 1,如果减 1 后进入数为 0,那线程退出 monitor,不再是这个 monitor 的所有者。其他被这个 monitor 阻塞的线程可以尝试去获取这个 monitor 的所有权。 其实 wait/notify 等方法也依赖于 monitor 对象,这就是为什么只有在同步的块或者方法中才能调用 wait/notify 等方法,否则会抛出 java.lang.IllegalMonitorStateException 的异常的原因 在 HotSpotJVM 实现中,锁有个专门的名字:对象监视器。

# 汇编上的实现(cmpxchg)

synchronizied 方式实现 i++ 字节码: 图片4.jpg 汇编 图片5.jpg monitorenter 与 monitorexit 包裹了 getstatic i 及 putstatic i,等相关代码执行指令。中间值的交换采用了原子操作 lock cmpxchg %rsi,(%rdi),如果交换成功,则执行 goto 直接退出当前函数 return。如果失败,执行 jne 跳转指令,继续循环执行,直到成功为止。

cmpxchg 指令:比较 rsi 和目的操作数 rdi(第一个操作数)的值,如果相同,ZF 标志被设置,同时源操作数(第二个操作)的值被写到目的操作数,否则,清 ZF 标志为 0,并且把目的操作数的值写回 rsi,则执行 jne 跳转指令。

# Java 对象头

synchronized 用的锁放在 java 对象头里。 有两种情况: 数组对象,虚拟机使用 3 个字宽存储对象头。 非数组对象,则使用 2 个字宽来存储对象头。32 位虚拟机中,1 个字宽等于 4 字节,即 32 字节。

长度 内容 说明
32/64bit mark word 存储对象的 hashCode 或者锁信息
32/64bit Class metadata address 存储对象描述数据的指针
32/64bit Array length 数组的长度

Mark Word 的存储结构: 图片6.jpg

# 锁的分类

  • synchronized 是重量级锁,效率较低。
  • synchronized 所用到的锁是存在 Java 对象头中。在 Java1.6 中,锁一共有 4 种状态,由低到高依次是:无锁,偏向锁,轻量级锁,重量级锁,这几种状态会随着竞争情况逐渐升级。锁可以升级但不能降级,意味着偏向锁升级成轻量级锁不能降级成偏向锁。这种锁升级却不能降级的策略,目的是为了提高获得锁和释放锁的效率。
  • monitorenter 和 monitorexit 是上层指令,底层实现可能是偏向锁、轻量级锁、重量级锁等。 图片7.jpg

# 偏向锁(只有一个线程进入临界区)

引入背景:大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁,减少不必要的 CAS 操作。

  • 加锁:当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程 ID,以后该线程在进入和退出同步块时不需要花费 CAS 操作来加锁和解锁,而只需简单的测试一下对象头的 Mark Word 里是否存储着指向当前线程的偏向锁,如果测试成功,表示线程已经获得了锁,如果测试失败,则需要再测试下 Mark Word 中偏向锁的标识是否设置成 1(表示当前是偏向锁),如果没有设置,则使用 CAS 竞争锁,如果设置了,则尝试使用 CAS 将对象头的偏向锁指向当前线程(此时会引发竞争,偏向锁会升级为轻量级锁)。
  • 膨胀过程:当前线程执行 CAS 获取偏向锁失败(这一步是偏向锁的关键),表示在该锁对象上存在竞争并且这个时候另外一个线程获得偏向锁所有权。当到达全局安全点(safepoint)时获得偏向锁的线程被挂起,并从偏向锁所有者的私有 Monitor Record 列表中获取一个空闲的记录,并将 Object 设置 LightWeight Lock 状态并且 Mark Word 中的 LockRecord 指向刚才持有偏向锁线程的 Monitor record,最后被阻塞在安全点的线程被释放,进入到轻量级锁的执行路径中,同时被撤销偏向锁的线程继续往下执行同步代码。
  • 偏向锁,顾名思义,它会偏向于第一个获取锁的线程,如果在接下来的运行过程中,该锁没有被其他的线程尝试获取,则持有偏向锁的线程将永远不需要触发同步。 在锁对象的对象头中有个偏向锁的线程 ID 字段,这个字段如果是空的,第一次获取锁的时候,就 CAS 将自身的线程 ID 写入到 MarkWord 的偏向锁线程 ID 字段内,将 MarkWord 中的偏向锁的标识置 1。这样下次获取锁的时候,直接检查偏向锁线程 ID 是否和自身线程 ID 一致,如果一致,则认为当前线程已经获取了锁,因此不需再次获取锁;
  • 如果不一致,则表明在这个对象上已经存在竞争了,检查原来持有该对象锁的线程是否依然存活,如果挂了,则可以将对象变为无锁状态,然后重新偏向新的线程,如果原来的线程依然存活,则偏向锁升级为轻量级锁,(偏向锁就是这个时候升级为轻量级锁的) 图片8.jpg

# 轻量级锁(多个线程交替进入临界区)

  • 引入背景:轻量级锁认为竞争存在,但是竞争的程度很轻,一般两个线程对于同一个锁的操作都会错开,或者说稍微等待一下(自旋),另一个线程就会释放锁。 但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁膨胀为重量级锁,重量级锁使除了拥有锁的线程以外的线程都阻塞,防止 CPU 空转。
  • 轻量级锁加锁:线程在执行同步块之前,JVM 会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的 Mark Word 复制到锁记录中,官方称为 Displaced Mark Word。然后线程尝试使用 CAS 将对象头中的 Mark Word 替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,则自旋重试。重试一定次数后膨胀为重量级锁(修改 MarkWord,改为指向重量级锁的指针),阻塞当前线程。
  • 轻量级锁解锁:轻量级解锁时,会使用原子的 CAS 操作来将 Displaced Mark Word 替换回到对象头,如果成功,则表示没有竞争发生。如果失败,表示有其他线程尝试获得锁,则释放锁,并唤醒被阻塞的线程。 图片9.jpg

# 重量级锁(多个线程同时进入临界区)

在 JDK 1.6 之前,监视器锁可以认为直接对应底层操作系统中的互斥量(mutex)。这种同步方式的成本非常高,包括系统调用引起的内核态与用户态切换、线程阻塞造成的线程切换等。因此,后来称这种锁为“重量级锁”。

# 锁的比较

图片10.jpg

# 锁的优化

Java1.6 对锁的实现引入了大量的优化,如自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销。

# 自旋锁

线程的阻塞和唤醒需要 CPU 从用户态转为核心态,频繁的阻塞和唤醒对 CPU 来说是一件负担很重的工作,势必会给系统的并发性能带来很大的压力。同时我们发现在许多应用上面,对象锁的锁状态只会持续很短一段时间,为了这一段很短的时间频繁地阻塞和唤醒线程是非常不值得的。所以引入自旋锁。

何谓自旋锁?

  • 所谓自旋锁,就是让该线程等待一段时间,不会被立即挂起,看持有锁的线程是否会很快释放锁。怎么等待呢?执行一段无意义的循环即可(自旋)。
  • 自旋等待不能替代阻塞,先不说对处理器数量的要求(多核,貌似现在没有单核的处理器了),虽然它可以避免线程切换带来的开销,但是它占用了处理器的时间。如果持有锁的线程很快就释放了锁,那么自旋的效率就非常好,反之,自旋的线程就会白白消耗掉处理的资源,它不会做任何有意义的工作,反而会带来性能上的浪费。所以说,自旋等待的时间(自旋的次数)必须要有一个限度,如果自旋超过了规定的时间仍然没有获取到锁,则应该被挂起。

# 适应性自旋锁

  • JDK 1.6 引入了更加聪明的自旋锁,即自适应自旋锁。所谓自适应就意味着自旋的次数不再是固定的,它是由上一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。它怎么做呢?线程如果自旋成功了,那么下次自旋的次数会更加多,因为虚拟机认为既然上次成功了,那么此次自旋也很有可能会再次成功,那么它就会允许自旋等待持续的次数更多。
  • 反之,如果对于某个锁,很少有自旋能够成功的,那么在以后要获得这个锁的时候自旋的次数会减少甚至省略掉自旋过程,以免浪费处理器资源。 有了自适应自旋锁,随着程序运行和性能监控信息的不断完善,虚拟机对程序锁的状况预测会越来越准确,虚拟机会变得越来越聪明。

# 锁消除

为了保证数据的完整性,我们在进行操作时需要对这部分操作进行同步控制,但是在有些情况下,JVM 检测到不可能存在共享数据竞争,这是 JVM 会对这些同步锁进行锁消除。锁消除的依据是逃逸分析的数据支持。

# 锁粗化

    public void vectorTest(){
        Vector<String> vector = new Vector<String>();
        for(int i = 0 ; i < 10 ; i++){
            vector.add(i + "");
        }

        System.out.println(vector);
    }
1
2
3
4
5
6
7
8
  • 我们知道在使用同步锁的时候,需要让同步块的作用范围尽可能小—仅在共享数据的实际作用域中才进行同步,这样做的目的是为了使需要同步的操作数量尽可能缩小,如果存在锁竞争,那么等待锁的线程也能尽快拿到锁。
  • 在大多数的情况下,上述观点是正确的。但是如果一系列的连续加锁解锁操作,可能会导致不必要的性能损耗,所以引入锁粗化的概念。
  • 锁粗化是将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁。如上面实例:vector 每次 add 的时候都需要加锁操作,JVM 检测到对同一个对象(vector)连续加锁、解锁操作,会合并一个更大范围的加锁、解锁操作,即加锁解锁操作会移到 for 循环之外。 图片11.jpg

# 原子操作原理

CAS 操作的意思是比较并交换,它需要两个数值,一个旧值(期望操作前的值)和新值。操作之前比较两个旧值是否变化,如无变化才交换为新值。

# CPU 如何实现原子操作

  • 1)在硬件层面,CPU 依靠总线加锁和缓存锁定机制来实现原子操作。
    • 使用总线锁保证原子性。如果多个 CPU 同时对共享变量进行写操作(i++),通常无法得到期望的值。CPU 使用总线锁来保证对共享变量写操作的原子性,当 CPU 在总线上输出 LOCK 信号时,其他 CPU 的请求将被阻塞住,于是该 CPU 可以独占共享内存。
    • 使用缓存锁保证原子性。频繁使用的内存地址的数据会缓存于 CPU 的 cache 中,那么原子操作只需在 CPU 内部执行即可,不需要锁住整个总线。缓存锁是指在内存中的数据如果被缓存于 CPU 的 cache 中,并且在 LOCK 操作期间被锁定,那么当它执行锁操作写回到内存时,CPU 修改内部的内存地址,并允许它的缓存一致性来保证操作的原子性,因为缓存一致性机制会阻止同时修改由两个以上处理器 缓存的 内存区域数据。当其他 CPU 回写被锁定的 cache 行数据时候,会使 cache 行无效。

# Java 如何实现原子操作

  • 2)Java 使用了锁和循环 CAS 的方式来实现原子操作。
    • 使用循环 CAS 实现原子操作。JVM 的 CAS 操作使用了 CPU 提供的 CMPXCHG 指令来实现,自旋式 CAS 操作的基本思路是循环进行 CAS 操作直到成功为止。1.5 之后的并发包中提供了诸如 AtomicBoolean, AtomicInteger 等包装类来支持原子操作。CAS 存在 ABA,循环时间长开销大,以及只能保证一个共享变量的原子操作三个问题。
    • cmpxchg(void* ptr, int old, int new),如果 ptr 和 old 的值一样,则把 new 写到 ptr 内存,否则返回 ptr 的值,整个操作是原子的。
    • 使用锁机制实现原子操作。锁机制保证了只有获得锁的线程才能给操作锁定的区域。JVM 的内部实现了多种锁机制。除了偏向锁,其他锁的方式都使用了循环 CAS,也就是当一个线程想进入同步块的时候,使用循环 CAS 方式来获取锁,退出时使用 CAS 来释放锁。

# CAS 在 OpenJDK 中的实现

以 compareAndSwapInt 为例:

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)( (x, addr, e)) == e;
UNSAFE_END
1
2
3
4
5
6

linux 下

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
1
2
3
4
5
6
7
8

程序会根据当前处理器的类型来决定是否为 cmpxchg 指令添加 lock 前缀。如果程序是在多处理器上运行,就为 cmpchg 指令加上 lock 前缀;如果是在单处理器上运行,就省略 lock 前缀。 Intel 对 lock 前缀的说明如下:

  • 1)确保对内存的读-改-写操作原子执行(基于总线锁或缓存锁)
  • 2)禁止该指令,与 之前 和 之后 的读写指令重排序
  • 3)把写缓冲区中的所有数据刷新到内存中。

# 同步容器

同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。容器上常见的复合操作包括:迭代,跳跃,以及条件运算。

# ConcurrentHashMap

它们提供的迭代器不会抛出 ConcurrentModificationException,因此不需要再迭代过程中对容器加锁。ConcurrentHasMap 返回的迭代器具有弱一致性,而并非及时失败。弱一致性的迭代器可以容忍并发的修改,当修改迭代器会遍历已有的元素,并可以在迭代器被构造后将修改操作反映给容器。

# CopyOnWriteArrayList

  • 用于替代同步 List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。写入时复制容器返回的迭代器不会抛出
  • ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。
  • 显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时,仅当迭代操作远远多于修改操作时,才应该使用写入时复制容器。

# BlockingQueue

  • 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法。如果队列已经满了,那么 put 方法将阻塞直到有空间可用;如果队列为空,那么 take 方法将会阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远都不会充满,因此无界队列上的 put 方法也永远不会阻塞。
  • 在构建高可靠的应用程序时,有界队列 ArrayBlockingQueue 是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

# ThreadLocal

在线程之间共享变量是存在风险的,有时可能要避免共享变量,使用 ThreadLocal 辅助类为各个线程提供各自的实例。 例如有一个静态变量

public static final SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd”);
1

如果两个线程同时调用 sdf.format(…) 那么可能会很混乱,因为 sdf 使用的内部数据结构可能会被并发的访问所破坏。当然可以使用线程同步,但是开销很大;或者也可以在需要时构造一个局部 SImpleDateFormat 对象。但这很浪费

# 同步工具使用

# Semaphore(信号量)

  • 信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池,或者对容器施加边界。
  • Semaphore 中管理着一组虚拟的许可 permit,许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。release 方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为 1 的 Semaphore。二值信号量可以用作互斥体,并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。
  • 可以用于实现资源池,当池为空时,请求资源将会阻塞,直至存在资源。将资源返回给池之后会调用 release 释放许可。
public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore semaphore;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        this.semaphore = new Semaphore(bound);
    }
    public boolean add(T t) throws InterruptedException {
        semaphore.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(t);
            return wasAdded;
        } finally {
            if(!wasAdded){
                semaphore.release();
            }
        }
    }

    public boolean remove(Object o){
        boolean wasRemoved = set.remove(o);
        if(wasRemoved){
            semaphore.release();
        }
        return wasRemoved;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# CyclicBarrier(可循环使用的屏障/栅栏)

CountDownLatch CyclicBarrier
减计数方式 加计数方式
计算为 0 时释放所有等待的线 计数达到指定值时释放所有等待线程
计算为 0 时释放所有等待的线程 计数达到指定值时释放所有等待线程
计数为 0 时,无法重置 计数达到指定值时,计数置为 0 重新开始
调用 countDown()方法计数减一,调用 await()方法只进行阻塞,对计数没任何影响 调用 await()方法计数加 1,若加 1 后的值不等于构造方法的值,则线程阻塞
不可重复利用 可重复利用
  • 线程在 countDown()之后,会继续执行自己的任务,而 CyclicBarrier 会在所有线程任务结束之后,才会进行后续任务。
  • Barrier 类似于闭锁,它能阻塞一组线程直到某个线程发生。栅栏与闭锁的关键区别在于,前者未达到条件时每个线程都会阻塞在 await 上,直至条件满足所有线程解除阻塞,后者未达到条件时 countDown 不会阻塞,条件满足时会解除 await 线程的阻塞。
  • CyclicBarrier 可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用;这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用 await 方法,这个方法将阻塞直到所有线程都达到栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。
  • 如果对 await 方法的调用超时,或者 await 阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的 await 调用都被终止并抛出 BrokenBarrierException。如果成功通过栅栏,那么 await 将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来选举产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier 还可以使你将一个栅栏操作传递给构造函数,这是一个 Runnable,当成功通过栅栏时会在一个子任务线程中执行它。
  • 可以用于多线程计算数据,最后合并计算结果的场景。CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以使用 reset 方法重置。
/**
 * 通过CyclicBarrier协调细胞自动衍生系统中的计算
 */
public class CellularAutomata {
    private final Board mainBoard;
    private final CyclicBarrier cyclicBarrier;
    private final Worker[] workers;

    public CellularAutomata(Board board){
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.cyclicBarrier = new CyclicBarrier(count, new Runnable() {
            public void run() {
                mainBoard.commitNewValues();
            }
        });
        this.workers = new Worker[count];
        for (int i = 0; i < count; i++) {
            workers[i] = new Worker(mainBoard.getSubBoard(count,i));
        }
    }

    private class Worker implements Runnable{
        private final Board board;

        public Worker(Board board){
            this.board = board;
        }

        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++) {
                    for (int y = 0; y < board.getMaxY(); y++) {
                        board.setNewValue(x, y, computeValue(x, y));
                    }
                }
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                    return;
                }
            }
        }
    }

    private int computeValue(int x, int y) {
        return x+y;
    }


    public void start(){
        for (int i = 0; i < workers.length; i++) {
            new Thread(workers[i]).start();
        }
        mainBoard.waitForConvergence();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

# Exchanger(两个线程交换数据)

  • 另一种形式的栅栏是 Exchanger,它是一种两方栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,Exchanger 会非常有用。
  • Exchanger 用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过 exchange 方法交换数据, 如果第一个线程先执行 exchange 方法,它会一直等待第二个线程也执行 exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
public class TestExchanger {

    private Exchanger<String> exchanger = new Exchanger<String>();

    private ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public void start() {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "银行流水A";// A录入银行流水数据
                    exchanger.exchange(A);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "银行流水B";// B录入银行流水数据
                    String A = exchanger.exchange("B");
                    System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
                            + A + ",B录入是:" + B);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        threadPool.shutdown();
    }

    public static void main(String[] args) {
        new TestExchanger().start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# CountDownLatch(闭锁)

  • 闭锁可以延迟线程的进度直到其达到终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁达到结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动指导其他活动都完成后才继续执行。
  • 闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件发生了,而 await 方法等待计数器达到 0,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为 0,或者等待中的线程中断,或者等待超时。
public class TestCountDownLatch {
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(5);
        LatchDemo latchDemo = new LatchDemo(latch);
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 5; ++i) {
            new Thread(latchDemo).start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        System.out.println("总计耗时:" + (end - begin));
    }
}

class LatchDemo implements Runnable {
    private CountDownLatch latch;

    public LatchDemo(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 50000; i++) {
                if (i % 2 == 0) {
                    System.out.println(i);
                }
            }
        } finally {
            latch.countDown();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# FutureTask(Future 实现类)

  • FurureTask 是 Future 接口的唯一实现类。
  • FutureTask 表示的计算是通过 Callable 来实现的,相当于一种可生成结果的 Runnable,并且可以处于以下 3 种状态:等待运行、正在运行和运行完成。
  • Future.get 方法的行为取决于任务的状态。如果任务已经完成,那么 get 会立即返回结果,否则会阻塞直到任务进入完成状态,然后返回结果或者抛出异常。- --- -- FutureTask 将计算结果从执行计算的线程传递到获取这个结果的线程,而 FutureTask 的规范确保了这种传递过程能实现结果的安全发布。
  • Callable 表示的任务可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出一个 Error。无论任务代码抛出什么异常,都会被封装到一个 ExecutionException 中,并在 future.get 中被重新抛出。
  • 当 get 方法抛出 ExecutionException,可能是以下三种情况之一:Callable 抛出的受检查异常,RuntimeException,以及 Error。

# Future

Future 接口设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。 在 Future 中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要等待耗时的操作完成。

示例

public void future() {
    ExecutorService executor = Executors.newCachedThreadPool();
    Future<Double> future = executor.submit(new Callable<Double>() {
        @Override
        public Double call() throws Exception {
            return doSomethingComputation();
        }
    });
    // 在另一个线程执行耗时操作的同时,去执行一些其他的任务。
    // 这些任务不依赖于future的结果,可以与future并发执行。
    // 如果下面的任务马上依赖于future的结果,那异步操作是没有意义的。
    doSomethingElse();
    try {
        // 如果不设置超时时间,那么线程会阻塞在这里。
        Double result = future.get(1, TimeUnit.SECONDS);
        System.out.println("result is " + result);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}

private void doSomethingElse() {
    System.out.println("doSomethingElse");
}

private double doSomethingComputation() {
    System.out.println("doSomethingComputation");
    return 0.1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

局限性

Future 无法实现以下的功能。

    1. 将两个异步操作计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的记过
  • 2)等待 Future 集合中的所有任务都完成
  • 3)仅等待 Future 集合中最快结束的任务完成,并返回它的结果
  • 4)通过编程方式完成一个 Future 任务的执行(以手工设定异步操作结果)
  • 5)应对 Future 的完成事件(完成回调)

# CompletableFuture

实现异步 API(将任务交给另一线程完成,该线程与调用方异步,通过回调函数或阻塞的方式取得任务结果)

1)Shop

public class Shop {
    private ThreadLocalRandom random;
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public Future<Double> getPriceAsync(String product){
        CompletableFuture<Double> future = new CompletableFuture<>();
        // 另一个线程计算
        executorService.submit(() -> {
            try {
                double price = calculatePrice(product);
                future.complete(price);
            } catch (Exception e) {
                // 处理异常
                future.completeExceptionally(e);
                e.printStackTrace();
            }
        });
        return future;
    }

    private double calculatePrice(String product){
        random = ThreadLocalRandom.current();
        // 模拟耗时操作
        delay();
        // 随机
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public static void delay(){
        try {
            Thread.sleep(1000);
            throw new RuntimeException("product is not available");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Shop shop = new Shop();

        Future<Double> price = shop.getPriceAsync("my favorite product");
        // 计算price和doSomethingElse是并发执行的
        doSomethingElse();
        try {
            // 如果此时已经计算完毕,则立即返回;如果没有计算完毕,则会阻塞
            Double result = price.get();
            System.out.println("result is " + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

    private static void doSomethingElse() {
        System.out.println("doSomethingElse");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

2) GracefulShop

工厂方法创建的Future自己内部维护了一个线程池。
public class GracefulShop {
    private ThreadLocalRandom random;
    public Future<Double> getPriceAsync(String product){
        // 接收一个Supplier,该Supplier会交由ForkJoinPool池中的某个执行线程执行
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }

    private double calculatePrice(String product){
        random = ThreadLocalRandom.current();
        // 模拟耗时操作
        delay();
        // 随机
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public static void delay(){
        try {
            Thread.sleep(1000);
            throw new RuntimeException("product is not available");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        GracefulShop shop = new GracefulShop();

        Future<Double> price = shop.getPriceAsync("my favorite product");
        // 计算price和doSomethingElse是并发执行的
        doSomethingElse();
        try {
            // 如果此时已经计算完毕,则立即返回;如果没有计算完毕,则会阻塞
            Double result = price.get();
            System.out.println("result is " + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

    private static void doSomethingElse() {
        System.out.println("doSomethingElse");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 将批量同步操作转为异步操作(并行流/CompletableFuture)

如果原本的 getPrice 是同步方法的话,那么如果想批量调用 getPrice,提高效率的方法要么使用并行流,要么使用 CompletableFuture。

public class SyncShop {
    private String name;

    public SyncShop(String name) {
        this.name = name;
    }

    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public double getPrice(String product) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public String getName() {
        return name;
    }
}

public class BestProductPriceCalculator {
    private List<SyncShop> shops = Arrays.asList(
            new SyncShop("BestPrice"),
            new SyncShop("LetsSaveBig"),
            new SyncShop("MyFavoriteShop"),
            new SyncShop("BuyItAll")
    );

    public List<String> findPricesWithParallelStream(String product) {
        return shops
                .parallelStream()
                .map(shop -> shop.getName() + ":" + shop.getPrice(product))
                .collect(Collectors.toList());
    }

    public List<String> findPricesWithCompletableFuture(String product) {
        List<CompletableFuture<String>> futures = shops
                .stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + ":" + shop.getPrice(product)))
                .collect(Collectors.toList());
        // join方法和Future的get方法有相同的含义,并且也声明在Future接口中,它们唯一的不同就是join不会抛出任何检测到的异常。
        return futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }
}


public class FutureTest {
    private BestProductPriceCalculator calculator = new BestProductPriceCalculator();
    // 1s
    @Test
    public void testParallelStream(){
        calculator.findPricesWithParallelStream("my favorite product");
    }
    //2s
    @Test
    public void testCompletableFuture(){
        calculator.findPricesWithCompletableFuture("my favorite product");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

使用并行流还是 CompletableFuture? 前者是无法调整线程池的大小的(处理器个数),而后者可以。 如果是计算密集型应用,且没有 IO,那么推荐使用并行流 如果是 IO 密集型,需要等待 IO,那么使用 CompletableFuture 灵活性更高,比如根据《Java 并发编程实战》中给出的公式计算线程池合适的大小。

# 多个异步任务合并

逻辑如下: 从每个商店获取 price,price 以某种格式返回。拿到 price 后解析 price,然后调用远程 API 根据折扣计算最终 price。 可以分为三个任务,每个商店都要执行这三个任务。

public class PipelineShop {
    private String name;

    public PipelineShop(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public String getPrice(String product) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        double price = calculatePrice(product);
        Discount.DiscountCode code = Discount.DiscountCode.values()[random.nextInt(Discount.DiscountCode.values().length)];
        return String.format("%s:%.2f:%s", name, price, code);
    }

    private double calculatePrice(String product) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        // 模拟耗时操作
        delay();
        // 随机
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


public class Discount {
    public enum DiscountCode{
        NONE(0),SILVER(5),GOLD(10),PLATINUM(15),DIAMOND(20);
        private int percentage;
        DiscountCode(int percentage){
            this.percentage = percentage;
        }
    }

     public static String applyDiscount(Quote quote){
        return quote.getShopName()+ " price is " + apply(quote.getPrice(), quote.getDiscountCode());
     }

    private static double apply(double price, DiscountCode discountCode) {
        // 模拟调用远程服务的延迟
        delay();
        return price * ( 100 - discountCode.percentage ) / 100 ;
    }

}

public class Quote {
    private String shopName;
    private double price;
    private Discount.DiscountCode discountCode;

    public Quote(String shopName, double price, Discount.DiscountCode discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    public static Quote parse(String str){
        String [] slices = str.split(":");
        return new Quote(slices[0],Double.parseDouble(slices[1]),Discount.DiscountCode.valueOf(slices[2]));
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.DiscountCode getDiscountCode() {
        return discountCode;
    }
}


public class BestProductPriceWithDiscountCalculator {
    private List<PipelineShop> shops = Arrays.asList(
            new PipelineShop("BestPrice"),
            new PipelineShop("LetsSaveBig"),
            new PipelineShop("MyFavoriteShop"),
            new PipelineShop("BuyItAll")
    );

    public List<String> findPricesWithPipeline(String product) {
        List<CompletableFuture<String>> futures = shops
                .stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product)))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(
                        quote -> CompletableFuture.supplyAsync(
                                () -> Discount.applyDiscount(quote)
                        )
                ))
                .collect(Collectors.toList());
        return futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

# 回调

public class CallbackBestProductPriceCalculator {
    private List<PipelineShop> shops = Arrays.asList(
            new PipelineShop("BestPrice"),
            new PipelineShop("LetsSaveBig"),
            new PipelineShop("MyFavoriteShop"),
            new PipelineShop("BuyItAll")
    );

    public Stream<CompletableFuture<String>> findPricesStream(String product) {
        return shops
                .stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product)))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(
                        quote -> CompletableFuture.supplyAsync(
                                () -> Discount.applyDiscount(quote)
                        )
                ));
    }
}


@Test
public void testCallback(){
    CompletableFuture[] futures = callbackBestProductPriceCalculator.findPricesStream("my favorite product").map(
            future -> future.thenAccept(System.out::println)
    ).toArray(size -> new CompletableFuture[size]);
    CompletableFuture.allOf(futures).join();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# API

CompletableFuture 类实现了 CompletionStage 和 Future 接口。Future 是 Java 5 添加的类,用来描述一个异步计算的结果,但是获取一个结果时方法较少,要么通过轮询 isDone,确认完成后,调用 get()获取值,要么调用 get()设置一个超时时间。但是这个 get()方法会阻塞住调用线程,这种阻塞的方式显然和我们的异步编程的初衷相违背。 为了解决这个问题,JDK 吸收了 guava 的设计思想,加入了 Future 的诸多扩展功能形成了 CompletableFuture。

CompletionStage 是一个接口,从命名上看得知是一个完成的阶段,它里面的方法也标明是在某个运行阶段得到了结果之后要做的事情。、

# supplyAsync 提交任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
1
2

# thenApply 变换(等待前一个任务返回后执行,处于同一个 CompletableFuture)

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
1
2
3

首先说明一下以 Async 结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在 ForkJoinPool.commonPool()中执行,下文中将会有好多类似的,都不详细解释了。关键的入参只有一个 Function,它是函数式接口,所以使用 Lambda 表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。 不带 Async 的方法会在和前一个任务相同的线程中处理; 以 Async 的方法会将任务提交到一个线程池,所有每个任务是由不同的线程处理的。

public void thenApply() {
    String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
    System.out.println(result);
}
1
2
3
4

# thenAccept 消耗

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public void thenAccept() {
    CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s + " world"));
}
1
2
3
4
5
6
7

# thenRun 执行下一步操作,不关心上一步结果

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenRun它的入参是一个Runnable的实例,表示当得到上一步的结果时的操作。
public void thenRun() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }).thenRun(() -> System.out.println("hello world"));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# thenCombine 结合两个 CompletionStage 的结果,进行转化后返回

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
1
2
3

它需要原来的处理返回值,并且 other 代表的 CompletionStage 也要返回值之后,利用这两个返回值,进行转换后返回指定类型的值。

public void thenCombine() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "world";
    }), (s1, s2) -> s1 + " " + s2).join();
    System.out.println(result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# thenCompose(合并多个 CompletableFuture,流水线执行,在调用外部接口返回 CompletableFuture 类型时更方便)

<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
1
  • thenCompose 方法允许对两个异步操作(supplyAsync)进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。
  • 创建两个 CompletableFuture,对第一个 CompletableFuture 对象调用 thenCompose,并向其传递一个函数。当第一个 CompletableFuture 执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个 CompletableFuture 的返回做输入计算出的第二个 CompletableFuture 对象。

# thenAccptBoth 结合两个 CompletionStage 的结果,进行消耗

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);
1
2
3

它需要原来的处理返回值,并且 other 代表的 CompletionStage 也要返回值之后,利用这两个返回值,进行消耗。

public void thenAcceptBoth() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "world";
    }), (s1, s2) -> System.out.println(s1 + " " + s2));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# runAfterBoth 在两个 CompletionStage 都运行完执行,不关心上一步结果

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
1
2
3

不关心这两个 CompletionStage 的结果,只关心这两个 CompletionStage 执行完毕,之后在进行操作(Runnable)。

public void runAfterBoth() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s2";
    }), () -> System.out.println("hello world"));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# applyToEither 两个 CompletionStage,谁计算的快,我就用那个 CompletionStage 的结果进行下一步的转化操作

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
1
2
3

我们现实开发场景中,总会碰到有两种渠道完成同一个事情,所以就可以调用这个方法,找一个最快的结果进行处理。

public void applyToEither() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello world";
    }), s -> s).join();
    System.out.println(result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# acceptEither 两个 CompletionStage,谁计算的快,我就用那个 CompletionStage 的结果进行下一步的消耗操作

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

public void acceptEither() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).acceptEither(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello world";
    }), System.out::println);
    while (true) {
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# runAfterEither 两个 CompletionStage,任何一个完成了都会执行下一步的操作,不关心上一步结果

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
public void runAfterEither() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).runAfterEither(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s2";
    }), () -> System.out.println("hello world"));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# exceptionally 当运行时出现了异常,可以进行补偿

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
public void exceptionally() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (1 == 1) {
            throw new RuntimeException("测试一下异常情况");
        }
        return "s1";
    }).exceptionally(e -> {
        System.out.println(e.getMessage());
        return "hello world";
    }).join();
    System.out.println(result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# whenComplete 当运行完成时,若有异常则改变返回值,否则返回原值

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

public void whenComplete() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (1 == 1) {
            throw new RuntimeException("测试一下异常情况");
        }
        return "s1";
    }).whenComplete((s, t) -> {
        System.out.println(s);
        System.out.println(t.getMessage());
    }).exceptionally(e -> {
        System.out.println(e.getMessage());
        return "hello world";
    }).join();
    System.out.println(result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

null java.lang.RuntimeException: 测试一下异常情况 java.lang.RuntimeException: 测试一下异常情况 hello world 这里也可以看出,如果使用了 exceptionally,就会对最终的结果产生影响,它无法影响如果没有异常时返回的正确的值,这也就引出下面我们要介绍的 handle。

# handle 当运行完成时,无论有无异常均可转换

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

public void handle() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //出现异常
        if (1 == 1) {
            throw new RuntimeException("测试一下异常情况");
        }
        return "s1";
    }).handle((s, t) -> {
        if (t != null) {
            return "hello world";
        }
        return s;
    }).join();
    System.out.println(result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

hello world

public void handle() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).handle((s, t) -> {
        if (t != null) {
            return "hello world";
        }
        return s;
    }).join();
    System.out.println(result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

s1

# allOf

allOf 工厂方法接收一个由 CompletableFuture 构成的数组,数组中的所有 CompletableFuture 对象执行完成之后,它返回一个 CompletableFuture <Void>对象。这意味着,如果你需要等待最初 Stream 中的所有 CompletableFuture 对象执行完毕,对 allOf 方法返回的 CompletableFuture 执行 join 操作是个不错的注意。

# anyOf

只要 CompletableFuture 对象数组中有一个执行完毕,便不再等待。

# ForkJoin

双端队列 LinkedBlockingDeque 适用于另一种相关模式,即工作密取(work stealing)。在生产者——消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列头部秘密地获取工作。密取工作模式比传统的生产者——消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。当工作者线程需要访问另一个队列时,它会从队列的头部而不是从尾部获取工作,因此进一步降低了队列上的竞争程度。 图片12.jpg 图片13.jpg

  • 第一步分割任务。首先我们需要有一个 fork 类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
  • 第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join 使用两个类来完成以上两件事情:

  • ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork()和 join()操作的机制,通常情况下我们不需要直接继承- ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
    • oRecursiveAction:用于没有返回结果的任务。
    • oRecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

threshold 临界值

RecursiveTask 有两个方法:fork 和 join fork 是执行子任务,join 是取得子任务的结果,用于合并

public class TestForkJoin {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ForkJoinPool pool = new ForkJoinPool();
		ForkJoinCalculator calculator = new ForkJoinCalculator(0, 10000000L);
		Long result = pool.invoke(calculator);
		System.out.println(result);
		pool.shutdown();
	}
}

class ForkJoinCalculator extends RecursiveTask<Long> {

	private static final long serialVersionUID = -6682191224530210391L;

	private long start;
	private long end;
	private static final long THRESHOLD = 10000L;

	public ForkJoinCalculator(long start, long end) {
		this.start = start;
		this.end = end;
	}

	@Override
	protected Long compute() {
		long length = end - start;
		if (length < THRESHOLD) {
			long sum = 0L;
			for (long i = start; i < end; ++i) {
				sum += i;
			}
			return sum;
		} else {
			long middle = (start + end) / 2;
			ForkJoinCalculator left = new ForkJoinCalculator(start, middle);
			left.fork();
			ForkJoinCalculator right = new ForkJoinCalculator(middle, end);
			right.fork();
			return left.join() + right.join();
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 原理浅析

    1. 每个 Worker 线程都维护一个任务队列,即 ForkJoinWorkerThread 中的任务队列。
  • . 任务队列是双向队列,这样可以同时实现 LIFO 和 FIFO。
  • . 子任务会被加入到原先任务所在 Worker 线程的任务队列。
    1. Worker 线程用 LIFO 的方法取出任务,也就后进队列的任务先取出来(子任务总是后加入队列,但是需要先执行)。
    1. Worker 线程的任务队列为空,会随机从其他的线程的任务队列中拿走一个任务执行(所谓偷任务:steal work,FIFO 的方式)。
    1. 如果一个 Worker 线程遇到了 join 操作,而这时候正在处理其他任务,会等到这个任务结束。否则直接返回。
    1. 如果一个 Worker 线程偷任务失败,它会用 yield 或者 sleep 之类的方法休息一会儿,再尝试偷任务(如果所有线程都是空闲状态,即没有任务运行,那么该线程也会进入阻塞状态等待新任务的到来)。

# 与 MapReduce 的区别

MapReduce 是把大数据集切分成小数据集,并行分布计算后再合并。

ForkJoin 是将一个问题递归分解成子问题,再将子问题并行运算后合并结果。

二者共同点:都是用于执行并行任务的。基本思想都是把问题分解为一个个子问题分别计算,再合并结果。应该说并行计算都是这种思想,彼此独立的或可分解的。从名字上看 Fork 和 Map 都有切分的意思,Join 和 Reduce 都有合并的意思,比较类似。

区别:

  • 1)环境差异,分布式 vs 单机多核:ForkJoin 设计初衷针对单机多核(处理器数量很多的情况)。MapReduce 一开始就明确是针对很多机器组成的集群环境的。也就是说一个是想充分利用多处理器,而另一个是想充分利用很多机器做分布式计算。这是两种不同的的应用场景,有很多差异,因此在细的编程模式方面有很多不同。
  • 2)编程差异:MapReduce 一般是:做较大粒度的切分,一开始就先切分好任务然后再执行,并且彼此间在最后合并之前不需要通信。这样可伸缩性更好,适合解决巨大的问题,但限制也更多。ForkJoin 可以是较小粒度的切分,任务自己知道该如何切分自己,递归地切分到一组合适大小的子任务来执行,因为是一个 JVM 内,所以彼此间通信是很容易的,更像是传统编程方式。

# 线程池使用

# 引入原因

  • 1)任务处理过程从主线程中分离出来,使得主循环能够更快地重新等待下一个到来的连接,使得任务在完成前面的请求之前可以接受新的请求,从而提高响应性。
  • 2)任务可以并行处理,从而能同时服务多个请求。如果有多个处理器,或者任务由于某种原因被阻塞,程序的吞吐量将得到提高。
  • 3)任务处理代码必须是线程安全的,因为当有多个任务时会并发地调用这段代码。

无限制创建线程的不足:

  • 1)线程生命周期的开销非常高 2)资源消耗
  • 3)稳定性

解决方式:线程池 Executor 框架

使用线程池的好处:

  • 1)降低资源消耗
  • 2)提高响应速度
  • 3)提高线程的可管理性

# Executor ExecutorService ScheduledExecutorService

继承体系 图片14.jpg 图片15.jpg 图片16.jpg

# ExecutorService

图片17.jpg

# ScheduledExecutorService

图片18.jpg

返回值

图片19.jpg

示例

public class QuoteTask implements Callable<TravelQuote> {
    private final TravelCompany company;
    private final TravelInfo travelInfo;
    private ExecutorService exec;
    public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
        this.company = company;
        this.travelInfo = travelInfo;
    }

    public TravelQuote call() throws Exception {
        return company.solicitQuote(travelInfo);
    }

    public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException {
        //任务
        List<QuoteTask> tasks = new ArrayList<QuoteTask>();
        for (TravelCompany company : companies) {
            tasks.add(new QuoteTask(company,travelInfo));
        }
        //执行
        List<Future<TravelQuote>> futures =  exec.invokeAll(tasks,time,unit);
        List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
        Iterator<QuoteTask> taskIterator = tasks.iterator();
        //取出结果
        for(Future<TravelQuote> future:futures){
            QuoteTask task = taskIterator.next();
            try {
                quotes.add(future.get());
            } catch (ExecutionException e) {
                quotes.add(task.getFailureQuote(e.getCause()));
                e.printStackTrace();
            }catch(CancellationException e){
                quotes.add(task.getTimeOutQuote(e));
            }
        }
        Collections.sort(quotes,ranking);
        return quotes;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# ThreadPoolExecutor

创建线程池 图片20.jpg

线程动态变化

  • 1.当线程池小于 corePoolSize 时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
  • 2.当线程池达到 corePoolSize 时,新提交任务将被放入 workQueue 中,等待线程池中任务调度执行
  • 3.当 workQueue 已满,且 maximumPoolSize>corePoolSize 时,新提交任务会创建新线程执行任务
  • 4.当提交任务数超过 maximumPoolSize 时,新提交任务由 RejectedExecutionHandler 处理
  • 5.当线程池中超过 corePoolSize 线程,空闲时间达到 keepAliveTime 时,关闭空闲线程
  • 6.当设置 allowCoreThreadTimeOut(true)时,线程池中 corePoolSize 线程空闲时间达到 keepAliveTime 也将关闭

创建一个线程池时需要以下几个参数:

  • 1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用线程池的 prestartAllCoreThreads 方法,线程池会提前创建并启动所有基本线程。
  • 2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:
    • a)ArrayBlockingQueue:基于数组的有界阻塞队列,FIFO
    • b) LinkedBlockingQueue:基于链表的无界阻塞队列,FIFO,吞吐量高于 ArrayBlockingQueue,Executors.newFixedThreadPoll()使用了这个队列
    • c)SynchronousQueue:一个只存储一个元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入一直处于阻塞状态,吞吐量高于 LinkedBlockingQueue,Executors#newCachedThreadPoll()使用了这个队列
    • d)PriorityBlockingQueue:具有优先级的无界阻塞队列
  • 3)maximumPoolSize(线程池的最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果使用了无界队列该参数就没有意义了。
  • 4)ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  • 5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,或者当线程池已关闭时,会采用一种策略处理提交的新任务。这个策略默认是 AbortPolicy,表示无法处理新任务时抛出异常。有以下四种饱和策略:
    • a)AbortPolicy:直接抛出异常
    • b)CallerRunsPolicy:使用调用者所在线程来运行任务
    • c)DiscardOldestPolicy:丢弃队列中最近的一个任务,并执行当前任务
    • d)DiscardPolicy:不处理,直接丢弃 也可以自定义饱和策略。
  • 6)keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。出现 timeout 情况下,而且线程数超过了核心线程数,会销毁销毁线程。保持在 corePoolSize 数。除非设置了 allowCoreThreadTimeOut 和超时时间,这种情况线程数可能减少到 0,最大可能是 Integer.MAX_VALUE。 如果任务很多,每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
    • allowCoreThreadTimeOut 为 true 该值为 true,则线程池数量最后销毁到 0 个。
    • allowCoreThreadTimeOut 为 false 销毁机制:超过核心线程数时,而且(超过最大值或者 timeout 过),就会销毁。
  • 7)TimeUnit(线程活动保持时间的单位)

# 使用注意

  • 1、只有当任务都是同类型并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成拥塞。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。幸运的是,在基于网络的典型服务器应用程序中——web 服务器、邮件服务器、文件服务器等,它们的请求通常都是同类型的并且相互独立的。
  • 2、设置线程池的大小: 基于 Runtime.getRuntime().avialableprocessors() 进行动态计算 对于计算密集型的任务,在 N 个处理器的系统上,当线程池为 N+1 时,通过能实现最优的利用率(缺页故障等暂停时额外的线程也能确保 CPU 时钟周期不被浪费)。 对于包含 IO 操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大,比如 2N。要正确地设置线程池的大小,你必须估算出任务的等待时间与计算时间的比值。线程等待时间所占比例越高,需要越多线程。线程 CPU 时间所占比例越高,需要越少线程。这种估算不需要很精确,而且可以通过一些分析或监控工具来获得。你还可以通过另一种方法来调节线程池的大小:在某个基准负载下,分别设置不同大小的线程池来运行应用程序,并观察 CPU 利用率。 最佳线程数目 = (线程等待时间与线程计算时间之比 + 1) CPU 数目
  • 3、线程的创建与销毁 基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
  • 4、管理队列任务 ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务。基本的任务排队方法有 3 种:无界队列、有界队列和同步移交。 一种稳妥的资源管理策略是使用有界队列,有界队列有助于避免资源耗尽的情况发生,但又带来了新的问题:当队列填满后,新的任务该怎么办?
  • 5、饱和策略 当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor 的饱和策略可以通过 setRejectedExecutionHandler 来修改。JDK 提供了几种不同的 RejectedExecutionHandler 的实现,每种实现都包含有不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。
    • 1)中止策略是默认的饱和策略,该策略将抛出未检查的 RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。
    • 2)当新提交的任务无法保存到队列中执行时,抛弃策略会悄悄抛弃该任务。
    • 3)抛弃最旧的策略则会抛弃下一个将被执行的任务,然后尝试重新提交下一个将被执行的任务(如果工作队列是一个优先级队列,那么抛弃最旧的将抛弃优先级最高的任务)
    • 4)调用者运行策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退给调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了 execute 的线程中执行该任务。为什么好?因为当服务器过载时,这种过载情况会逐渐向外蔓延开来——从线程池到工作队列到应用程序再到 TCP 层,最终达到客户端,导致服务器在高负载下实现一种平缓的性能降低。
  • 6、线程工厂 在许多情况下都需要使用定制的线程工厂方法。例如,你希望为线程池中的线程指定一个 UncaughtExceptionHandler,或者实例化一个定制的 Thread 类用于执行调试信息的记录,你还可能希望修改线程的优先级(虽然不提倡这样做),或者只是给线程取一个更有意义的名字,用来解释线程的转储信息和错误日志。
  • 7、在调用构造函数后再定制 ThreadPoolExecutor

# 扩展 ThreadPoolExecutor

public class TimingThreadPool extends ThreadPoolExecutor {
    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("TimingThreadPool");
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t,r);
        log.fine(String.format("Thread %s :start %s",t,r));
        startTime.set(System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try{
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.fine(String.format("Thread %s : end %s ,time = %dns",t,r,taskTime));
        }finally {
            super.afterExecute(r, t);
        }
    }

    @Override
    protected void terminated() {
        try {
            log.info(String.format("Terminated : avg time = %dns",totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 任务时限

Future 的 get 方法可以限时,如果超时会抛出 TimeOutException,那么此时可以通过 cancel 方法来取消任务。如果编写的任务是可取消的,那么可以提前中止它,以免消耗过多的资源。 创建 n 个任务,将其提交到一个线程池,保留 n 个 Future,并使用限时的 get 方法通过 Future 串行地获取每一个结果,这一切都很简单。但还有一个更简单的实现:invokeAll。 将多个任务提交到一个 ExecutorService 并获得结果。invokeAll 方法的参数是一组任务,并返回一组 Future。这两个集合有着相同的结构。invokeAll 按照任务集合中迭代器的顺序将所有的 Future 添加到返回的集合中,从而使调用者能将各个 Future 与其表示的 Callable 关联起来。当所有任务执行完毕时,或者调用线程被中断时,又或者超时,invokeAll 将返回。当超时时,任何还未完成的任务都会取消。当 invokeAll 返回后,每个任务要么正常地完成,要么被取消,而客户端代码可以调用 get 或 isCancelled 来判断究竟是何种情况。

public class QuoteTask implements Callable<TravelQuote> {
    private final TravelCompany company;
    private final TravelInfo travelInfo;
    private ExecutorService exec;
    public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
        this.company = company;
        this.travelInfo = travelInfo;
    }

    public TravelQuote call() throws Exception {
        return company.solicitQuote(travelInfo);
    }

    public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException {
        //任务
        List<QuoteTask> tasks = new ArrayList<QuoteTask>();
        for (TravelCompany company : companies) {
            tasks.add(new QuoteTask(company,travelInfo));
        }
        //执行
        List<Future<TravelQuote>> futures =  exec.invokeAll(tasks,time,unit);
        List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
        Iterator<QuoteTask> taskIterator = tasks.iterator();
        //取出结果
        for(Future<TravelQuote> future:futures){
            QuoteTask task = taskIterator.next();
            try {
                quotes.add(future.get());
            } catch (ExecutionException e) {
                quotes.add(task.getFailureQuote(e.getCause()));
                e.printStackTrace();
            }catch(CancellationException e){
                quotes.add(task.getTimeOutQuote(e));
            }
        }
        Collections.sort(quotes,ranking);
        return quotes;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 任务关闭

线程有一个相应的所有者,即创建该线程的类,因此线程池是工作者线程的所有者,如果要中断这些线程,那么应该使用线程池。 ExecutorService 中提供了 shutdown 和 shutdownNow 方法。 前者是正常关闭,后者是强行关闭。

  • 1)它们都会阻止新任务的提交
  • 2)正常关闭是停止空闲线程,正在执行的任务继续执行并完成所有未执行的任务
  • 3)强行关闭是停止所有(空闲+工作)线程,关闭当前正在执行的任务,然后返回所有尚未执行的任务。

通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。

但是我们无法通过常规方法来找出哪些任务已经开始但尚未结束,这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,而且还需要知道当 Executor 关闭时哪些任务正在执行。


处理非正常的线程终止(只对 execute 提交的任务有效,submit 提交的话会在 future.get 时将受检异常直接抛出)

要为线程池中的所有线程设置一个 UncaughtExceptionHandler,需要为 ThreadPoolExecutor 的构造函数提供一个 ThreadFactory。标准线程池允许当发生未捕获异常时结束线程,但由于使用了一个 try-finally 块来接收通知,因此当线程结束时,将有新的线程来代替它。如果没有提供捕获异常处理器或者其他的故障通知机制,那么任务会悄悄失败,从而导致很大的混乱。如果你希望在任务由于发生异常而失败时获得通知,并且执行一些特定于任务的恢复操作,那么可以将任务封装在能捕获异常的 Runnable 或 Callable 中,或者改写 ThreadPoolExecutor 的 afterExecute 方法。

只有通过 execute 提交的任务,才能将它抛出的异常交给未捕获异常处理器。如果一个由 submit 提交的任务由于抛出了异常而结束,那么这个异常将被 Future.get 封装在 ExecutionException 中重新抛出。

public class QuoteTask implements Callable<TravelQuote> {
    private final TravelCompany company;
    private final TravelInfo travelInfo;
    private ExecutorService exec;
    public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
        this.company = company;
        this.travelInfo = travelInfo;
    }

    public TravelQuote call() throws Exception {
        return company.solicitQuote(travelInfo);
    }

    public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException {
        //任务
        List<QuoteTask> tasks = new ArrayList<QuoteTask>();
        for (TravelCompany company : companies) {
            tasks.add(new QuoteTask(company,travelInfo));
        }
        //执行
        List<Future<TravelQuote>> futures =  exec.invokeAll(tasks,time,unit);
        List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
        Iterator<QuoteTask> taskIterator = tasks.iterator();
        //取出结果
        for(Future<TravelQuote> future:futures){
            QuoteTask task = taskIterator.next();
            try {
                quotes.add(future.get());
            } catch (ExecutionException e) {
                quotes.add(task.getFailureQuote(e.getCause()));
                e.printStackTrace();
            }catch(CancellationException e){
                quotes.add(task.getTimeOutQuote(e));
            }
        }
        Collections.sort(quotes,ranking);
        return quotes;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# ScheduledThreadPoolExecutor

它继承自 ThreadPoolExecutor,主要用来在给定的延迟之后运行任务,或者定期执行任务。Timer 是单个后台线程,而 ScheduledThreadPoolExecutor 可以在构造函数中指定多个对应的后台线程数。

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}
1
2
3
4
5
6

内部工作队列是 DelayedWorkQueue,它是一个无界队列,maxPoolSize 这个参数没有意义。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable>

public class TestScheduledThreadPool {
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
      for(int i = 0; i < 10 ;++i){
         Future<Integer> result = pool.schedule(new ThreadPoolDemo2(), 800, TimeUnit.MILLISECONDS);
         System.out.println(result.get());
      }
      pool.shutdown();
   }
}

class ThreadPoolDemo2 implements Callable<Integer> {
   @Override
   public Integer call() throws Exception {
      int sum = 0;
      for (int i = 0; i < 100; ++i) {
         sum += i;
         System.out.println(Thread.currentThread().getName() + "\t" + i);
      }
      return sum;
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# Executors

Executors 是一个工厂类,可以创建 3 种类型的 ThreadPoolExecutor 和 2 种类型的 ScheduledThreadPool。 图片21.jpg

# FixedThreadPool

创建固定线程数的 FixedThreadPool,适用于负载比较重的服务器。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
1
2
3
4
5

corePoolSize 和 maxPoolSize 都被设置为创建 FixedThreadPoolExecutor 时指定的参数 nThreads。

keepAliveTime 为 0 表示多余的空闲线程将会被立即终止。

使用无界队列 LinkedBlockingQueue 来作为线程池的工作队列,并且默认容量为 Integer.MAX_VALUE。使用无界队列会带来以下影响:

  • 1)当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
  • 2)maximumPoolSize 是一个无效的参数
  • 3)keepAliveTime 是一个无效参数
  • 4)运行中的 FixedThreadPool(未执行 shutdown 或 shutdownNow)不会拒绝任务。

# SingleThreadExecutor

适用于需要保证顺序地执行各个任务,并且在任意时间点不会有多个线程活动的应用场景。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
1
2
3
4
5
6

它也是使用无界队列,corePoolSize 和 maxPoolSize 都为 1。

# CachedThreadPool

大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
1
2
3
4
5

使用没有缓冲区、只能存储一个元素的 SynchronousQueue 作为工作队列。

maxPoolSize 是无界的,如果主线程提交任务的速度高于 maxPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新线程。极端情况下,CachedThreadPool 会因为创建过多线程而耗尽 CPU 和内存。 图片22.jpg

任务执行过程:

  • 1)首先执行 SynchronousQueue#offer(Runnable) 。如果当前 maxPool 中有空闲线程正在执行 SynchronousQueue#poll,那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行;否则执行 2)
  • 2)当初始 maxPool 为空,或者 maxPool 中没有空闲线程时,此时 CachedThreadPool 会创建一个新线程执行任务
  • 3)在 2)中新创建的线程执行任务完毕后,会执行 SynchronousQueue#poll,这个 poll 操作会让空闲线程最多在 SynchronousQueue 中等待 60 秒。如果 60 秒内主线程提交了一个新任务,那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。

图片23.jpg

# ScheduledThreadPoolExecutor

固定线程个数,适用于多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的梳理的应用场景。

# SingleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}
1
2
3
4

适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。

# CompletionService

CompletionService 将 Executor 和 BlockingQueue 的概念融合在一起,你可以将 Callable 任务提交给它来执行,然后使用类似于队列操作的 take 和 poll 等方法来获得已完成的结果,而这些结果会在完成时封装为 Future。ExecutorCompletionService 实现了 CompletionService 并将计算任务委托给一个 Executor。

ExecutorCompletionService 的实现非常简单,在构造函数中创建一个 BlockingQueue 来保存计算完成的结果。当计算完成时,调用 FutureTask 的 done 方法。当提交某个任务时,该任务将首先包装为一个 QueueingFuture,这是 FutureTask 的一个子类,然后再改写子类的 done 方法,并将结果放入 BlockingQueue 中。take 和 poll 方法委托给了 BlockingQueue,这些方法会在得出结果之前阻塞。

多个 ExecutorCompletionService 可以共享一个 Executor,因此可以创建一个对于特定计算私有,又能共享一个公共 Executor 的 ExecutorCompletionService。

public class CompletionServiceTest {
    public void test() throws InterruptedException, ExecutionException {
        ExecutorService exec = Executors.newCachedThreadPool();
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exec);
        for (int i = 0; i < 10; i++) {
            completionService.submit(new Task());
        }
        int sum = 0;
        for (int i = 0; i < 10; i++) {
        //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
            Future<Integer> future = completionService.take();
            sum += future.get();
        }
        System.out.println("总数为:" + sum);
        exec.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# J.U.C 源码解析

实现整个并发体系的真正底层是 CPU 提供的 lock 前缀+cmpxchg 指令和 POSIX 的同步原语(mutex&condition)

synchronized 和 wait&notify 基于 JVM 的 monitor,monitor 底层又是基于 POSIX 同步原语。

volatile 基于 CPU 的 lock 前缀指令实现内存屏障。

而 J.U.C 是基于 LockSupport,底层基于 POSIX 同步原语。

# AbstractQueuedSynchronizer(AQS)

在 ReentrantLock 和 Semaphore 这两个接口之间存在许多共同点,这两个类都可以用作一个阀门,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用 lock 或 acquire 时成功返回),也可以等待(在调用 lock 或 acquire 时阻塞),还可以取消(在调用 tryLock 或 tryAcquire 时返回假,表示在指定的时间内锁是不可用的或无法得到许可)。

可以通过锁来实现计数信号量。

事实上,它们在实现时都使用了一个共同的基类,即 AbstractQueuedSynchronizer(AQS),这个类也是其他许多同步类的基类。AQS 是一个用于构建锁和同步器的框架,许多同步器都可以通过 AQS 很容易并且高效地构造出来。不仅 ReentrantLock 和 Semaphore,还包括 CountDownLatch、ReentrantReadWriteLock、SynchronousQueue 和 FutureTask,都是基于 AQS 构造的。

在基于 AQS 构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。在设计 AQS 时充分考虑了可伸缩性,因此 java.util.concurrent 中所有基于 AQS 构建的同步器都能获得这个优势。

在基于 AQS 构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,获取操作的含义就很直观,即获取的是锁或许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。

AQS 负责管理同步器类中的状态,它管理了一个整数类型的状态信息,可以通过 getState、setState 以及 compareAndSetState 等 protected 类型方法来进行操作。这个整数可以用于表示任意状态。 图片24.jpg

它使用了一个 int 成员变量表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作。

子类通过继承 AQS 并实现它的抽象方法来管理同步状态,修改同步状态依赖于 AQS 的 getState、setState、compareAndSetState 来进行操作,它们能够保证状态的改变是安全的。

子类推荐被定义为自定义同步组件的静态内部类,AQS 自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用。AQS 既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。

# AQS 的接口

AQS 的设计是基于模板方法模式的,使用者需要继承同步器并重写指定的方法,随后将 AQS 组合在自定义同步组件的实现中,并调用 AQS 提供的模板方法,而这些模板方法将会调用使用者重写的方法。

同步器可重写的方法: 图片25.jpg

同步器提供的模板方法: 图片26.jpg

# AQS 使用实例(互斥锁,tryAcquire 只需一次 CAS)

public class Mutex implements Lock {
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (super.compareAndSetState(0, 1)) {
                super.setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (super.getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            super.setExclusiveOwnerThread(null);
            super.setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return super.getState() == 1;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# AQS 实现

主要工作基于 CLH 队列,voliate 关键字修饰的状态 state,线程去修改状态成功了就是获取成功,失败了就进队列等待,等待唤醒。在等待唤醒的时候,很多时候会使用自旋(while(!cas()))的方式,不停的尝试获取锁,直到被其他线程获取成功。

# AQS#state getState setState

/**
 * The synchronization state.
 */
private volatile int state;

/**
 * Returns the current value of synchronization state.
 * This operation has memory semantics of a {@code volatile} read.
 * @return current state value
 */
protected final int getState() {
    return state;
}

/**
 * Sets the value of synchronization state.
 * This operation has memory semantics of a {@code volatile} write.
 * @param newState the new state value
 */
protected final void setState(int newState) {
    state = newState;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 同步队列

AQS 依赖内部的 CLH 同步队列(一个 FIFO 双向队列)来完成同步状态的管理。当前线程获取同步状态失败时,AQS 会将当前线程以及等待状态等信息构造为一个 Node 并将其加入同步队列,并阻塞当前线程。当同步状态释放时,会把后继节点线程唤醒,使其再次尝试获取同步状态。后继节点将会在获取同步状态成功时将自己设置为头节点。

# AQS#Node

图片27.jpg

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and
     * CONDITION for condition nodes.  It is modified using CAS
     * (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;

    /**
     * Link to predecessor node that current node/thread relies on
     * for checking waitStatus. Assigned during enqueuing, and nulled
     * out (for sake of GC) only upon dequeuing.  Also, upon
     * cancellation of a predecessor, we short-circuit while
     * finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes
     * head only as a result of successful acquire. A
     * cancelled thread never succeeds in acquiring, and a thread only
     * cancels itself, not any other node.
     */
    volatile Node prev;

    /**
     * Link to the successor node that the current node/thread
     * unparks upon release. Assigned during enqueuing, adjusted
     * when bypassing cancelled predecessors, and nulled out (for
     * sake of GC) when dequeued.  The enq operation does not
     * assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that
     * node is at end of queue. However, if a next field appears
     * to be null, we can scan prev's from the tail to
     * double-check.  The next field of cancelled nodes is set to
     * point to the node itself instead of null, to make life
     * easier for isOnSyncQueue.
     */
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135

图片28.jpg

# 独占式同步状态

在获取同步状态时,AQS 调用 tryAcquire 获取同步状态。AQS 维护一个同步队列,获取同步状态失败的线程都会被加入到队列中并在队列进行自旋(等待);移出队列的条件是前驱节点是头结点且成功获取了同步状态;

在释放同步状态时,AQS 调用 tryRelease 释放同步状态,然后唤醒头节点的后继节点,使其尝试获取同步状态。

# AQS#acquire

acquire(int)可以获取同步状态,对中断不敏感。

  • 1)调用自定义同步器实现的 tryAcquire
  • 2)如果成功,那么结束
  • 3)如果失败,那么调用 addWaiter 加入同步队列尾部,并调用 acquireQueued 获取同步状态(前提是前驱节点为 head)
    • 3.1)如果获取到了,那么将自己设置为头节点,返回
    • 3,2)如果前驱节点不是 head 或者没有获取到,那么判断前驱节点状态是否为 SIGNAL,
    • 3.2.1) 如果是,那么阻塞当前线程,阻塞解除后仍自旋获取同步状态
    • 3.2.2) 如果不是,那么删除状态为 CANCELLED 的前驱节点,将前驱节点状态设置为 SIGNAL,继续自旋尝试获取同步状态。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
1
2
3
4
5
# addWaiter(新 Node 添加到同步队列尾部,初始状态下 head 是一个空节点)

图片29.jpg 获取同步状态失败的线程会被构造成 Node 加入到同步队列尾部,这个过程必须是线程安全的,AQS 基于 CAS 来设置同步队列的尾节点 compareAndSetTail。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

可能 tail 为 null,或者 tail 不为 null,但 CAS 添加 node 至尾部失败,此时会 enq

如果 tail 为 null,则设置 head 和 tail 都指向一个空节点

然后循环 CAS 添加 node 至尾部,直至成功。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# acquireQueued

图片30.jpg

  • 设置首节点是由获取同步状态成功的线程来完成的,因为只有一个线程能够成功获取同步状态,因此设置头节点的方法并不需要 CAS 的包装。
  • 如果自己是第二个结点,那么尝试获取同步状态,如果成功,那么将自己设置为头节点,并返回。
  • 如果自己不是第二个结点或者 CAS 获取失败,那么判断是否应该阻塞,如果应该,那么阻塞,否则自旋重新尝试获取同步状态。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
	// 如果前驱是head,即该结点是第二个结点,那么便有资格去尝试获取资源(可能是head释放完资源唤醒自己的,当然也可能被interrupt了)
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * Sets head of queue to be node, thus dequeuing. Called only by
 * acquire methods.  Also nulls out unused fields for sake of GC
 * and to suppress unnecessary signals and traversals.
 *
 * @param node the node
 */
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
```

###### shouldParkAfterFailedAcquire

- 1)如果前一个节点状态是SIGNAL,那么表示已经设置了前驱节点在获取到同步状态时会唤醒自己,就可以放心的去阻塞了。
- 2)否则会检查前一个节点状态是否是Cancelled
- 2.1)如果是,那么就删除前一个节点,直至状态不是Cancelled。
- 2.2)如果不是,那么将其状态设置为SIGNAL。
```
/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
```
###### parkAndCheckInterrupt

```
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
```

为什么只有前驱节点是头节点才能尝试获取同步状态?
- 1)头节点是成功获取到同步状态的节点,头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头结点
- 2)维护同步队列的FIFO原则。

![图片31.jpg](http://ww1.sinaimg.cn/large/007s8HJUly1g7mebzay7lj30pm0gq40o.jpg)


##### AQS#release

在释放同步状态之后,会唤醒其后继节点,使后继节点继续尝试获取同步状态。

```
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
```

######  unparkSuccessor
```
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
```

#### 共享式同步状态

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。
![图片32.jpg](http://ww1.sinaimg.cn/large/007s8HJUly1g7mebz74cbj30kq0d3gm8.jpg)

- 左半部分:共享式访问资源时,其他共享式的访问均被允许,而独占式访问被阻塞
- 右半部分:独占式访问资源时,同一时刻其他访问均被阻塞。


##### AQS#acquireShared

AQS会调用tryAcquireShared方法尝试获取同步状态,该方法返回值为int,当返回值大于等于0时,表示能够获取到同步状态。

如果返回值等于0表示当前线程获取共享锁成功,但它后续的线程是无法继续获取的,也就是不需要把它后面等待的节点唤醒。如果返回值大于0,表示当前线程获取共享锁成功且它后续等待的节点也有可能继续获取共享锁成功,也就是说此时需要把后续节点唤醒让它们去尝试获取共享锁。

- 1)调用自定义同步器实现的tryAcquireShared
- 2)如果成功,那么结束
- 3)如果失败,那么调用addWaiter加入SHARED节点至同步队列尾部,并调用再次尝试获取同步状态(前提是前驱节点为head)
- 3.1)如果获取到了,那么将自己设置为头节点,并向后唤醒共享节点(如果还有剩余acquire),返回
- 3.2) 如果前驱节点不是head或者没有获取到,那么判断前驱节点状态是否为SIGNAL
    - 3.2.1) 如果是,那么阻塞当前线程,阻塞解除后仍自旋获取同步状态
    - 3.2.2) 如果不是,那么删除状态为CANCELLED的前驱节点,将前驱节点状态设置为SIGNAL,继续自旋尝试获取同步状态。

```
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
```

##### doAcquiredShared

构造一个当前线程对应的共享节点,如果前驱节点是head并且尝试获取同步状态成功,那么将当前节点设置为head
```
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
```

##### setHeadAndPropagate

如果获取了同步状态,仍有剩余的acquire,那么继续向后唤醒
```
/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate > 0 or
 * PROPAGATE status was set.
 *
 * @param node the node
 * @param propagate the return value from a tryAcquireShared
 */
private void setHeadAndPropagate(Node node, long propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
// 如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒
// 这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
```

#### AQS#releaseShared
```
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
```

##### doReleaseShared
```
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
// 表示后继节点需要被唤醒
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,将自己设置为了头节点。为了使自己的唤醒动作可以传递给之后的节点,就需要重新进入循环
        if (h == head)                   // loop if head changed
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294

# 独占式超时获取同步状态

# AQS#tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
1
2
3
4
5
6
7
  • 该方法可以超时获取同步状态,即在指定上的时间段内获取同步状态,如果成功返回 true,失败则返回 false。
  • 对比另一个获取同步状态的方法 acquireInterruptibly,该方法等待时如果被中断,那么会立即返回并抛出 InterruptedException;而 synchronized 即使被中断也仅仅是设置中断标志位,并不会立即返回。
  • 而 tryAcquireNanos 不仅支持响应中断,还增加了超时获取的特性。
  • 针对超时获取,主要需要计算出需要等待的时间间隔 nanosTImeout,为了防止过早通知,nanosTimeout 的计算公式为:nanosTimeout -= now – lastTime。now 是当前唤醒时间,lastTime 为上次唤醒时间。
  • 如果 nanosTimeout 大于 0,则表示超时时间未到,需要继续等待 nanosTimeout 纳秒;反之已经超时。
# AQS#doAcquireNanos

图片33.jpg

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

ReentrantLock 锁是 Java 编程中最重要的同步机制,除了让临界区互斥执行之外,还可以让释放锁的线程向获取锁的线程发送消息。当线程释放锁时,JMM 会把该线程对应的本地 cache 中的共享变量刷新到主存中。当线程获取锁时,JMM 会把该线程对应的本地内存置为无效,从而使得临界区的代码必须从主存中读取共享变量。

对比锁和 volatile 的内存语义可以看出:锁的释放与 volatile 的写操作有相同的内存语义,锁的获取与 volatile 的读操作有相同的内存语义。 公平锁加锁 ReentrantLock#lock public void lock() { sync.lock(); }

FairSync#lock final void lock() { acquire(1); }

FairSync#tryAcquire(重入) 状态值在没有线程持有锁时为 0,有线程持有锁时大于 0

获取状态 1)如果为 0,表示是首次获取,判断同步队列中当前节点是否有前驱节点 1.1)如果有前驱节点,那么说明锁已被其他线程占有,返回失败 1.2) 如果没有前驱节点,那么说明当前节点为 head,CAS 将状态设置为 1 1.2.1) 如果设置成功,那么获取锁成功,将独占锁持有者设置为当前线程 1.2.2) 如果设置失败,那么说明锁竞争失败,返回失败 2)如果不为 0,判断独占锁持有者是否是当前线程 2.1)如果是,那么说明出现了重入,则将状态++ 2.2)如果不是,那么说明锁已被其他线程占有,返回失败

与非公平的 tryAcquire 相比,多了一个方法调用 hasQueuedPredecessors,即加入了同步队列中当前节点是否有前驱节点的判断。如果有前驱节点,那么有线程比当前线程更早地请求锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 首次获取 if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 已被其他线程占有 return false; } hasQueuedPredecessors public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }

公平锁解锁 ReentrantLock#unlock public void unlock() { sync.release(1); }

Sync#tryRelease 1)如果当前线程不是独占锁持有者,则抛出异常。 2)获取状态,将状态值减一,如果减为 0,则将独占锁持有者设置为 null,表示当前线程不再持有这个锁 3)更新状态值

protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

公平锁总结 在释放锁的最后写 volatile 变量 state,在获取锁时首先读这个 volatile 变量。根据 volatile 的 happens-before 规则,释放锁的线程在写 volatile 变量之前可见的共享内存,在获取锁的线程读取同一个 volatile 变量后将立即变得对获取锁的线程可见。

非公平锁加锁 NonfairSync#lock CAS 同时具有 volatile 读和 volatile 写的内存语义。 底层是基于 CPU 的 cmpxchg 指令实现的,Intel 会规定该指令:禁止该指令与 之前 和 之后 的读写指令重排序;把写缓冲区中的所有数据刷新到内存中。 这一点就足以同时实现 volatile 读和 volatile 写的内存语义了。

final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

AQS#acquire public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

NonfairLock#tryAcquire protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } NonfairLock#nonfairTryAcquire(重入) final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

ReentrantReadWriteLock 读写状态的设计 读写锁的自定义同步器需要在同步状态上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。 如果在一个整型变量上维护多种状态就需要按位切割使用该变量,读写锁把变量切成了两个部分,高 16 位表示读,低 16 位表示写。

当前同步状态为 S,则写状态=S&0x0000FFFF,读状态=S>>>16。 写状态加一,就是 S+1;读状态加一,就是 S+(1<<16)。 S 不为 0 时,若写状态为 0,则读状态大于 0,读锁已被获取。 写锁的获取与释放 写锁是一个支持重入的排它锁。如果当前线程已经获取了写锁,则增加写状态,如果当前线程在获取写锁时,读锁已经被获取,或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。 WriteLock#lock public void lock() { sync.acquire(1); } Sync#tryAcquire 如果存在读锁,那么写锁不能被获取。因为读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都是释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。

获取状态 1)如果状态不为 0,计算写状态 如果写状态为 0(存在读锁)或者独占锁持有者不是当前线程,则返回失败 否则是写线程重入的情况,更新写状态++,返回成功 2)如果状态为 0,如果是公平锁,那么判断当前节点是否有后继节点 2.1)如果有,则返回失败 2.2)如果没有,或者是非公平锁,则 CAS 更新写状态++ 2.2.1)如果更新失败,则返回失败 2.2.2) 如果更新成功,则将独占锁持有者设置为当前线程,返回成功 protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }

writerShouldBlock 如果是公平,那么返回当前节点是否有后继节点,即 hasQueuedPredecessors;如果是非公平,则直接返回 false WriteLock#unlock public void unlock() { sync.release(1); }

Sync#tryRelease protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } 读锁的获取与释放(放弃) 读锁是一个支持重入的共享锁,它能够被多个线程同时获取,在写状态为 0 时,读锁总会成功地获取,而所做的也只是线程安全地增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已经被其他线程获取,则进入等待状态。 在 Java6 中除了保存所有线程获取读锁次数的总和(state 的高 16 位),也保存了每个线程各自获取读锁的次数(ThreadLocal)。

ReadLock#lock public void lock() { sync.acquireShared(1); }

Sync#tryAcquireShared

1)如果有线程持有写锁并且不是当前线程,直接返回失败; 2)获取读状态 2.1)如果是公平锁,那么判断当前节点是否有后继节点 2.1.1)如果有,则执行 fullTryAcquireShared 2.1.2) 如果没有,继续执行 2.2)如果是非公平锁,那么判断 CAS 设置 state 成功,则设置读锁 count 的值。这一步并没有检查读锁重入的情况,被延迟到 fullTryAcquireShared 里了,因为大多数情况下不是重入的; 3.如果步骤 2 失败了,或许是队列策略返回 false 或许是 CAS 设置失败了等,则执行 fullTryAcquireShared。

protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 有其他写线程,则失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } readerShouldBlock 如果是公平,那么返回当前节点是否有后继节点,即 hasQueuedPredecessors;如果是非公平,则调用 apparentlyFirstQueuedIsExclusive /**

  • Returns {@code true} if the apparent first queued thread, if one
  • exists, is waiting in exclusive mode. If this method returns
  • {@code true}, and the current thread is attempting to acquire in
  • shared mode (that is, this method is invoked from {@link
  • #tryAcquireShared}) then it is guaranteed that the current thread
  • is not the first queued thread. Used only as a heuristic in
  • ReentrantReadWriteLock. */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }

fullTryAcquireShared final int fullTryAcquireShared(Thread current) { /* _ This code is in part redundant with that in _ tryAcquireShared but is simpler overall by not _ complicating tryAcquireShared with interactions between _ retries and lazily reading hold counts. */ HoldCounter rh = null; for (;😉 { int c = getState(); if (exclusiveCount(c) != 0) { // 有线程持有写锁且不是当前线程,直接失败 if (getExclusiveOwnerThread() != current) return -1; // 如果队列策略不允许,需要检查是否是读锁重入的情况。队列策略是否允许,分两种情况: // 1.公平模式:如果当前 AQS 队列前面有等待的结点,返回 false;2.非公平模式:如果 // AQS 前面有线程在等待写锁,返回 false(这样做的原因是为了防止写饥饿)。 } else if (readerShouldBlock()) { // 如果当前线程是第一个获取读锁的线程,则有资格获取读锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 优先赋值成上一次获取读锁成功的 cache,如果发现线程 tid 和当前线程不相等,再从 ThreadLocal 里获取 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 说明不是读锁重入的情况,直接返回失败了 if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 设置当前线程为第一个获取读锁的线程 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; // 读锁重入 } else if (firstReader == current) { firstReaderHoldCount++; } else { // 其他获取读锁成功的情况 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }

锁降级 锁降级是指把持有写锁,再获取到读锁,随后释放写锁的过程。

锁降级中读锁的获取是否必要?主要是为了保证数据的可见性。如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程 T 获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程 T 将会被阻塞。

LockSupport public static void park() { UNSAFE.park(false, 0L); }

/**

  • Block current thread, returning when a balancing
  • <tt>unpark </tt> occurs, or a balancing <tt>unpark </tt> has
  • already occurred, or the thread is interrupted, or, if not
  • absolute and time is not zero, the given time nanoseconds have
  • elapsed, or if absolute, the given deadline in milliseconds
  • since Epoch has passed, or spuriously (i.e., returning for no
  • "reason"). Note: This operation is in the Unsafe class only
  • because <tt>unpark </tt> is, so it would be strange to place it
  • elsewhere. */ public native void park(boolean isAbsolute, long time);

UnsafePark UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv _env, jobject unsafe, jboolean isAbsolute, jlong time)) UnsafeWrapper("Unsafe_Park"); EventThreadPark event; #ifndef USDT2 HS_DTRACE_PROBE3(hotspot, threadparkbegin, thread->parker(), (int) isAbsolute, time); #else / USDT2 / HOTSPOT_THREAD_PARK_BEGIN( (uintptr_t) thread->parker(), (int) isAbsolute, time); #endif / USDT2 / JavaThreadParkedState jtps(thread, time != 0); thread->parker()->park(isAbsolute != 0, time); #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, threadparkend, thread->parker()); #else / USDT2 / HOTSPOT_THREAD_PARK_END( (uintptr_t) thread->parker()); #endif / USDT2 _/ if (event.should_commit()) { oop obj = thread->current_park_blocker(); event.set_klass((obj != NULL) ? obj->klass() : NULL); event.set_timeout(time); event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0); event.commit(); } UNSAFE_END Parker 定义私有属性_counter:可以理解为是否可以调用 park 的一个许可证,只有_count > 0 的时候才能调用; 提供 public 方法 park 和 unpark 支撑阻塞/唤醒线程; Parker 继承 PlatformParker class Parker : public os::PlatformParker { private: volatile int _counter ; Parker _ FreeNext ; JavaThread * AssociatedWith ; // Current association

public: Parker() : PlatformParker() { _counter = 0 ; FreeNext = NULL ; AssociatedWith = NULL ; } protected: ~Parker() { ShouldNotReachHere(); } public: // For simplicity of interface with Java, all forms of park (indefinite, // relative, and absolute) are multiplexed into one call. void park(bool isAbsolute, jlong time); void unpark();

// Lifecycle operators static Parker _ Allocate (JavaThread _ t) ; static void Release (Parker _ e) ; private: static Parker _ volatile FreeList ; static volatile int ListLock ;

};

Linux#PlatformParker linux 下的 PlatformParker,基于 POSIX 的线程编写的。 POSIX 线程(POSIX threads),简称 Pthreads,是线程的 POSIX 标准。该标准定义了创建和操纵线程的一整套 API。在类 Unix 操作系统(Unix、Linux、Mac OS X 等)中,都使用 Pthreads 作为操作系统的线程。Windows 操作系统也有其移植版 pthreads-win32   class PlatformParker : public CHeapObj <mtInternal> { protected: enum { REL_INDEX = 0, ABS_INDEX = 1 }; int _cur_index; // which cond is in use: -1, 0, 1 pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [2] ; // one for relative times and one for abs.

public: // TODO-FIXME: make dtor private ~PlatformParker() { guarantee (0, "invariant") ; }

public: PlatformParker() { int status; status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr()); assert_status(status == 0, status, "cond_init rel"); status = pthread_cond_init (&_cond[ABS_INDEX], NULL); assert_status(status == 0, status, "cond_init abs"); status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); _cur_index = -1; // mark as unused } }; Parker#park 用 mutex 和 condition 保护了一个_counter 的变量,当 park 时,这个变量置为了 0,当 unpark 时,这个变量置为 1。

1、先尝试使用 Atomic 的 xchg,CAS 查看 counter 是否大于 0,如果是,那么更新为 0,返回 2、构造一个 ThreadBlockInVM,判断如果_counter > 0,可以调用,将_counter 置为 0,,unlock mutex,返回 3、根据等待时间调用不同的等待函数等待,如果等待返回正确,将_counter 置为 0,unlock mutex,返回,park 调用成功。 void Parker::park(bool isAbsolute, jlong time) { // Ideally we'd do something useful while spinning, such // as calling unpackTime().

// Optional fast-path check: // Return immediately if a permit is available. // We depend on Atomic::xchg() having full barrier semantics // since we are doing a lock-free update to _counter. if (Atomic::xchg(0, &_counter) > 0) return;

Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread;

// Optional optimization -- avoid state transitions if there's an interrupt pending. // Check interrupt before trying to wait if (Thread::is_interrupted(thread, false)) { return; }

// Next, demultiplex/decode time arguments timespec absTime; if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } if (time > 0) { unpackTime(&absTime, isAbsolute, time); }

// Enter safepoint region // Beware of deadlocks such as 6317397. // The per-thread Parker:: mutex is a classic leaf-lock. // In particular a thread must never block on the Threads_lock while // holding the Parker:: mutex. If safepoints are pending both the // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock. ThreadBlockInVM tbivm(jt);

// Don't wait if cannot get lock since interference arises from // unblocking. Also. check interrupt before trying wait if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; }

int status ; if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); return; }

#ifdef ASSERT // Don't catch signals while blocked; let the running threads have the signals. // (This allows a debugger to break into the running thread.) sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif

OSThreadWaitState osts(thread->osthread(), false /_ not Object.wait() _/); jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait");

#ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif

_counter = 0 ; status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence();

// If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } }

Parker#unpark 将_counter 置为 1; 判断之前_counter 的值: 小于 1 时,调用 pthread_cond_signal 唤醒在 park 中等待的线程,unlock mutex; 等于 1 时,unlock mutex,返回。 void Parker::unpark() { int s, status ; status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; s = _counter; _counter = 1; if (s < 1) { // thread might be parked if (_cur_index != -1) { // thread is definitely parked if (WorkAroundNPTLTimedWaitHang) { status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); } else { status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } }

wait&notify(忽略) 1)使用 wait、notify、notifyAll 时需要先对调用对象加锁 2)调用 wait 方法后,会放弃对象的锁,线程状态由运行态转为等待态,并将当前线程放置到对象的等待队列 3)notify 或 notifyAll 方法调用后,等待线程不会从 wait 返回,需要调用 notify 或 notifyAll 的线程释放锁后,等待线程才有机会从 wait 返回 4)notify 方法是将等待队列中的一个等待线程能从等待队列移至同步队列中,notifyAll 方法是将等待队列中的所有线程全部移至同步队列中,被移动的线程状态由等待态转为阻塞态 5)从 wait 方法返回的前提是获得了调用对象的锁

wait&notify 前提也是基于 monitorenter、monitorexit 指令实现的(对应 1))。

WaitThread 首先获取了对象的锁,然后调用对象的 wait 方法,从而放弃了锁,并进入了对象的等待队列 WaitQueue 中,进入等待状态。 由于 WaitThread 释放了对象的锁,NotifyThread 随后获取了对象的锁,并调用对象的 notify 方法,将 WaitThrad 从等待队列 WaitQueue 移到同步队列 SynchronizedQueue 中,此时 WaitThread 状态变为阻塞态。NotifyThread 释放了锁之后,WaitThread 再次获取到了锁,并从 wait 方法返回继续执行。 ObjectMonitor#wait 在 HotSpot 虚拟机中,monitor 采用 ObjectMonitor 实现。 void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { Thread * const Self = THREAD ; assert(Self->is_Java_thread(), "Must be Java thread!"); JavaThread *jt = (JavaThread *)THREAD;

DeferredInitialize () ;

// Throw IMSX or IEX. CHECK_OWNER();

EventJavaMonitorWait event;

// check for a pending interrupt if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { // post monitor waited event. Note that this is past-tense, we are done waiting. if (JvmtiExport::should_post_monitor_waited()) { // Note: 'false' parameter is passed here because the // wait was not timed out due to thread interrupt. JvmtiExport::post_monitor_waited(jt, this, false); } if (event.should_commit()) { post_monitor_wait_event(&event, 0, millis, false); } TEVENT (Wait - Throw IEX) ; THROW(vmSymbols::java_lang_InterruptedException()); return ; }

TEVENT (Wait) ;

assert (Self->_Stalled == 0, "invariant") ; Self->_Stalled = intptr_t(this) ; jt->set_current_waiting_monitor(this);

// create a node to be put into the queue // Critically, after we reset() the event but prior to park(), we must check // for a pending interrupt. ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; Self->_ParkEvent->reset() ; OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag

// Enter the waiting queue, which is a circular doubly linked list in this case // but it could be a priority queue or any data structure. // _WaitSetLock protects the wait queue. Normally the wait queue is accessed only // by the the owner of the monitor except in the case where park() // returns because of a timeout of interrupt. Contention is exceptionally rare // so we use a simple spin-lock instead of a heavier-weight blocking lock.

Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ;

if ((SyncFlags & 4) == 0) { _Responsible = NULL ; } intptr_t save = _recursions; // record the old recursion count _waiters++; // increment the number of waiters _recursions = 0; // set the recursion level to be 1 exit (true, Self) ; // exit the monitor guarantee (_owner != Self, "invariant") ;

// As soon as the ObjectMonitor's ownership is dropped in the exit() // call above, another thread can enter() the ObjectMonitor, do the // notify(), and exit() the ObjectMonitor. If the other thread's // exit() call chooses this thread as the successor and the unpark() // call happens to occur while this thread is posting a // MONITOR_CONTENDED_EXIT event, then we run the risk of the event // handler using RawMonitors and consuming the unpark(). // // To avoid the problem, we re-post the event. This does no harm // even if the original unpark() was not consumed because we are the // chosen successor for this monitor. if (node._notified != 0 && _succ == Self) { node._event->unpark(); }

// The thread is on the WaitSet list - now park() it. // On MP systems it's conceivable that a brief spin before we park // could be profitable. // // TODO-FIXME: change the following logic to a loop of the form // while (!timeout && !interrupted && _notified == 0) park()

int ret = OS_OK ; int WasNotified = 0 ; { // State transition wrappers OSThread* osthread = Self->osthread(); OSThreadWaitState osts(osthread, true); { ThreadBlockInVM tbivm(jt); // Thread is in thread_blocked state and oop access is unsafe. jt->set_suspend_equivalent();

if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
       // Intentionally empty
   } else
   if (node._notified == 0) {
     if (millis <= 0) {
        Self->_ParkEvent->park () ;
     } else {
        ret = Self->_ParkEvent->park (millis) ;
     }
   }

// were we externally suspended while we were waiting?
   if (ExitSuspendEquivalent (jt)) {
      // TODO-FIXME: add -- if succ == Self then succ = null.
      jt->java_suspend_self();
   }

} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm

// Node may be on the WaitSet, the EntryList (or cxq), or in transition
 // from the WaitSet to the EntryList.
 // See if we need to remove Node from the WaitSet.
 // We use double-checked locking to avoid grabbing _WaitSetLock
 // if the thread is not on the wait queue.
 //
 // Note that we don't need a fence before the fetch of TState.
 // In the worst case we'll fetch a old-stale value of TS_WAIT previously
 // written by the is thread. (perhaps the fetch might even be satisfied
 // by a look-aside into the processor's own store buffer, although given
 // the length of the code path between the prior ST and this load that's
 // highly unlikely).  If the following LD fetches a stale TS_WAIT value
 // then we'll acquire the lock and then re-fetch a fresh TState value.
 // That is, we fail toward safety.

if (node.TState == ObjectWaiter::TS_WAIT) {
     Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
     if (node.TState == ObjectWaiter::TS_WAIT) {
        DequeueSpecificWaiter (&node) ;       // unlink from WaitSet
        assert(node._notified == 0, "invariant");
        node.TState = ObjectWaiter::TS_RUN ;
     }
     Thread::SpinRelease (&_WaitSetLock) ;
 }

// The thread is now either on off-list (TS_RUN),
 // on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).
 // The Node's TState variable is stable from the perspective of this thread.
 // No other threads will asynchronously modify TState.
 guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
 OrderAccess::loadload() ;
 if (_succ == Self) _succ = NULL ;
 WasNotified = node._notified ;

// Reentry phase -- reacquire the monitor.
 // re-enter contended monitor after object.wait().
 // retain OBJECT_WAIT state until re-enter successfully completes
 // Thread state is thread_in_vm and oop access is again safe,
 // although the raw address of the object may have changed.
 // (Don't cache naked oops over safepoints, of course).

// post monitor waited event. Note that this is past-tense, we are done waiting.
 if (JvmtiExport::should_post_monitor_waited()) {
   JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
 }

if (event.should_commit()) {
   post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT);
 }

OrderAccess::fence() ;

assert (Self->_Stalled != 0, "invariant") ;
 Self->_Stalled = 0 ;

assert (_owner != Self, "invariant") ;
 ObjectWaiter::TStates v = node.TState ;
 if (v == ObjectWaiter::TS_RUN) {
     enter (Self) ;
 } else {
     guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
     ReenterI (Self, &node) ;
     node.wait_reenter_end(this);
 }

// Self has reacquired the lock.
 // Lifecycle - the node representing Self must not appear on any queues.
 // Node is about to go out-of-scope, but even if it were immortal we wouldn't
 // want residual elements associated with this thread left on any lists.
 guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
 assert    (_owner == Self, "invariant") ;
 assert    (_succ != Self , "invariant") ;

} // OSThreadWaitState()

jt->set_current_waiting_monitor(NULL);

guarantee (_recursions == 0, "invariant") ; _recursions = save; // restore the old recursion count _waiters--; // decrement the number of waiters

// Verify a few postconditions assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;

if (SyncFlags & 32) { OrderAccess::fence() ; }

// check if the notification happened if (!WasNotified) { // no, it could be timeout or Thread.interrupt() or both // check for interrupt event, otherwise it is timeout if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { TEVENT (Wait - throw IEX from epilog) ; THROW(vmSymbols::java_lang_InterruptedException()); } }

// NOTE: Spurious wake up will be consider as timeout. // Monitor notify has precedence over thread interrupt. }

Condition 每个 Condition 对象都包含着一个等待队列,该队列是 Condition 对象实现等待/通知功能的关键。 Condition 的实现类是 AQS 的内部类 ConditionObject。 ConditionObject /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; 等待队列 等待队列是一个 FIFO 的队列,在队列的每个节点上都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程,如果一个线程调用了 Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。 Condition 拥有等待队列的首节点 firstWaiter 和尾节点 lastWaiter。 每个 Node 都持有同一个队列中下一个 Node 的引用。

Condition 拥有首尾节点的引用,新增节点时仅需将原有的尾节点的 nextWaiter 指针指向它, 并更新尾节点即可。上述节点引用更新的过程并没有使用 CAS 保证,因为调用 await 方法的线程必定时获取了锁的线程。 在 Object 的监视器模型上,一个对象拥有一个同步队列和一个等待队列,而并发包中的 Lock 拥有一个同步队列和多个等待队列。

Condition 是 AQS 同步器的内部类,所以每个 Conditions 实例都能访问同步器提供的方法。

AQS 维护了一个同步队列,一个 AQS 对应多个 Condition,每个 Condition 维护了一个等待队列。

ConditionObject#await

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 当前线程构造为 Node,加入到等待队列 Node node = addConditionWaiter(); // 释放锁,唤醒同步队列中的后继节点 long savedState = fullyRelease(node); int interruptMode = 0; // 阻塞,直至被其他线程唤醒或中断 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 重新获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 移出等待队列 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) // 被其他线程中断,抛出 InterruptedException 异常 reportInterruptAfterWait(interruptMode); } ConditionObject#addConditionWaiter private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } AQS#fullyRelease /**

  • Invokes release with current state value; returns saved state.
  • Cancels node and throws exception on failure.
  • @param node the condition node for this wait
  • @return previous sync state */ final long fullyRelease(Node node) { boolean failed = true; try { long savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } AQS#isOnSyncQueue /**
  • Returns true if a node, always one that was initially placed on
  • a condition queue, is now waiting to reacquire on sync queue.
  • @param node the node
  • @return true if is reacquiring / final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; /
    • node.prev can be non-null, but not yet on queue because
    • the CAS to place it on queue can fail. So we have to
    • traverse from tail to make sure it actually made it. It
    • will always be near the tail in calls to this method, and
    • unless the CAS failed (which is unlikely), it will be
    • there, so we hardly ever traverse much. */ return findNodeFromTail(node); }

/**

  • Returns true if node is on sync queue by searching backwards from tail.
  • Called only when needed by isOnSyncQueue.
  • @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;😉 { if (t == node) return true; if (t == null) return false; t = t.prev; } }

ConditionObject#checkInterruptWhileWaiting /**

  • Checks for interrupt, returning THROW_IE if interrupted
  • before signalled, REINTERRUPT if after signalled, or
  • 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }

AQS#acquireQueued /**

  • Acquires in exclusive uninterruptible mode for thread already in
  • queue. Used by condition wait methods as well as acquire.
  • @param node the node
  • @param arg the acquire argument
  • @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, long arg) { boolean failed = true; try { boolean interrupted = false; for (;😉 { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

ConditionObject#unlinkCancelledWaiters /**

  • Unlinks cancelled waiter nodes from condition queue.
  • Called only while holding lock. This is called when
  • cancellation occurred during condition wait, and upon
  • insertion of a new waiter when lastWaiter is seen to have
  • been cancelled. This method is needed to avoid garbage
  • retention in the absence of signals. So even though it may
  • require a full traversal, it comes into play only when
  • timeouts or cancellations occur in the absence of
  • signals. It traverses all nodes rather than stopping at a
  • particular target to unlink all pointers to garbage nodes
  • without requiring many re-traversals during cancellation
  • storms. */ private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }

ConditionObject#reportInterruptAfterWait private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }

ConditionObject#signal 首先判断当前线程是否获取了锁,然后获取等待队列的首节点,将其移动到同步队列,并使用 LockSupport.unpark 唤醒节点中的线程。

public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }

/**

  • Removes and transfers nodes until hit non-cancelled one or
  • null. Split out from signal in part to encourage compilers
  • to inline the case of no waiters.
  • @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }

AQS#transferForSignal /**

  • Transfers a node from a condition queue onto sync queue.

  • Returns true if successful.

  • @param node the node

  • @return true if successfully transferred (else the node was

  • cancelled before signal) / final boolean transferForSignal(Node node) { /

    • If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;

    /*

    • Splice onto queue and try to set waitStatus of predecessor to
    • indicate that thread is (probably) waiting. If cancelled or
    • attempt to set waitStatus fails, wake up to resync (in which
    • case the waitStatus can be transiently and harmlessly wrong). */ // 等待队列中的头节点移动到了同步队列 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 唤醒该节点的线程 LockSupport.unpark(node.thread); return true; }

Semaphore(暂缓) CyclicBarrier(暂缓) CountDownLatch(暂缓) Exchanger(暂缓) AtomicInteger /**

  • Atomically increments by one the current value.
  • @return the updated value */ public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; }

/**

  • Atomically adds the given value to the current value of a field
  • or array element within the given object <code>o </code>
  • at the given <code>offset </code>.
  • @param o object/array to update the field/element in
  • @param offset field/element offset
  • @param delta the value to add
  • @return the previous value
  • @since 1.8 */ public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; }

/*_ Volatile version of {@link #getInt(Object, long)} _/ public native int getIntVolatile(Object o, long offset);

/**

  • Atomically update Java variable to <tt>x </tt> if it is currently
  • holding <tt>expected </tt>.
  • @return <tt>true </tt> if successful */ public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);

ThreadPoolExeuctor 线程池中持有一组 Runnable,称为 Worker,包装为 Thread,调用 Thread#start(作为一个线程去启动)。它们的 run 方法是一个循环,不断获取用户提交的 Runnable 并调用 Runnable#run(不是启动线程,仅仅是方法调用)。 状态转换

成员变量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private final BlockingQueue <Runnable> workQueue;

private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet <Worker> workers = new HashSet <Worker>();

private final Condition termination = mainLock.newCondition();

private int largestPoolSize;

private long completedTaskCount;

private volatile ThreadFactory threadFactory;

private volatile RejectedExecutionHandler handler;

private volatile long keepAliveTime;

private volatile boolean allowCoreThreadTimeOut;

private volatile int corePoolSize;

private volatile int maximumPoolSize;

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");

/_ The context to be used when executing the finalizer, or null. _/ private final AccessControlContext acc;

一个 ctl 变量可以包含两部分信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount). 由于 int 型的变量是由 32 位二进制的数构成, 所以用 ctl 的高 3 位来表示线程池的运行状态, 用低 29 位来表示线程池内有效线程的数量. 由于这两部分信息在该类中很多地方都会使用到, 所以我们也经常会涉及到要获取其中一个信息的操作, 通常来说, 代表这两个信息的变量的名称直接用他们各自英文单词首字母的组合来表示, 所以, 表示线程池运行状态的变量通常命名为 rs, 表示线程池中有效线程数量的变量通常命名为 wc, 另外, ctl 也通常会简写作 c。 由于 ctl 变量是由线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)这两个信息组合而成, 所以, 如果知道了这两部分信息各自的数值, 就可以调用下面的 ctlOf() 方法来计算出 ctl 的数值:

// rs: 表示线程池的运行状态 (rs 是 runState 中各单词首字母的简写组合) // wc: 表示线程池内有效线程的数量 (wc 是 workerCount 中各单词首字母的简写组合) private static int ctlOf(int rs, int wc) { return rs | wc; } 反过来, 如果知道了 ctl 的值, 那么也可以通过如下的 runStateOf() 和 workerCountOf() 两个方法来分别获取线程池的运行状态和线程池内有效线程的数量. private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } 其中, CAPACITY 等于 (2^29)-1, 也就是高 3 位是 0, 低 29 位是 1 的一个 int 型的数, private static final int COUNT_BITS = Integer.SIZE - 3; // 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // COUNT_BITS == 29 执行任务

当提交一个新任务到线程池时,线程池的处理流程如下: 1)线程池判断当前运行的线程是否少于 corePoolSize(需要获取全局锁),如果不是,则创建一个新的工作线程来执行任务(分配线程)。如果是,则进入 2) 2)线程池判断工作队列是否已经满了,如果没有满,则将新提交的任务存储到这个工作队列 BlockingQueue 里。如果满了,则进入 3) 3)线程池判断创建新的线程是否会使当前运行的线程超过 maxPoolSize(需要获取全局锁),如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute 方法时,尽可能地避免获取全局锁。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute 方法都是执行步骤 2,步骤 2 不需要获取全局锁。 execute(Runnable 不进行任何封装) public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果当前运行线程数小于 corePoolSize if (workerCountOf(c) < corePoolSize) { // 创建线程并执行当前任务 if (addWorker(command, true)) return; c = ctl.get(); } // 如果线程池处于运行状态,且(当前运行线程数大于等于 corePoolSize 或线程创建失败),则将当前任务放入工作队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次检查线程池的状态,如果线程池没有运行,且成功从工作队列中删除任务,则执行 reject 处理任务 if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果线程池不处于运行中或任务无法放进队列,并且当前线程数小于 maxPoolSize,则创建一个线程执行任务 else if (!addWorker(command, false)) // 创建线程失败,则执行 reject 处理任务 reject(command); }

  1. addWorker addWorker 主要负责创建新的线程并执行任务 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;😉 { int c = ctl.get(); int rs = runStateOf(c);

    // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;

    for (;😉 { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }

    boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());

    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

1.1) Worker(基于 AQS) 当 addWorker 中调用 t.start()时,这个 t 是 Worker 构造方法中使用 ThreadFactory 创建出来的 Thread,且将 this 作为 Runnable 传入,启动 t 时会调用 Worker#run 方法。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** _ This class will never be serialized, but we provide a _ serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in.  Null if factory fails.*/
final Thread thread;
/** Initial task to run.  Possibly null.*/
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
 * Creates with given first task and thread from ThreadFactory.
 * @param firstTask the first task (null if none)
 */
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}
// ...

}

1.2) Worker#runWorker final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

1.2.1) ThreadPoolExecutor#getTask 1)判断线程池是否已经关闭,如果关闭,则退出循环 2)根据当前 Worker 是否超时,对工作队列调用 poll 或 take 方法,进行阻塞。阻塞中的线程就是空闲线程。

当调用 shutdown 方法时,首先设置了线程池的状态为 ShutDown,此时 1 阶段的 worker 进入到状态判断时会返回 null,此时 Worker 退出。 因为 getTask 的时候是不加锁的,所以在 shutdown 时可以调用 worker.Interrupt.此时会中断退出,Loop 到状态判断时,同时 workQueue 为 empty。那么抛出中断异常,导致重新 Loop,在检测线程池状态时,Worker 退出。如果 workQueue 不为 null 就不会退出,此处有些疑问,因为没有看见中断标志位清除的逻辑,那么这里就会不停的循环直到 workQueue 为 Empty 退出。 这里也能看出来 SHUTDOWN 只是清除一些空闲 Worker,并且拒绝新 Task 加入,对于 workQueue 中的线程还是继续处理的。 对于 shutdown 中获取 mainLock 而 addWorker 中也做了 mainLock 的获取,这么做主要是因为 Works 是 HashSet 类型的,是线程不安全的,我们也看到在 addWorker 后面也是对线程池状态做了判断,将 Worker 添加和中断逻辑分离开。

timed 变量主要是标识着当前 Worker 超时是否要退出。wc > corePoolSize 时需要减小空闲的 Worker 数,那么 timed 为 true,但是 wc <= corePoolSize 时,不能减小核心线程数 timed 为 false。 timedOut 初始为 false,如果 timed 为 true 那么使用 poll 取线程。如果正常返回,那么返回取到的 task。如果超时,证明 worker 空闲,同时 worker 超过了 corePoolSize,需要删除。返回 r=null。则 timedOut = true。此时循环到 wc <= maximumPoolSize && ! (timedOut && timed)时,减小 worker 数,并返回 null,导致 worker 退出。如果线程数<= corePoolSize,那么此时调用 workQueue.take(),没有线程获取到时将一直阻塞,直到获取到线程或者中断。 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out?

for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

// Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }

int wc = workerCountOf(c);
    // Worker是否要减少
    // Are workers subject to culling?
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
        if (compareAndDecrementWorkerCount(c))
            return null;
        continue;
    }
    // 如果要减少Worker的话,如果在keepAliveTime内没有拿到任务,那么设置为超时,下次循环被会移除;如果不需要减少Worker,那么阻塞获取任务
    try {
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        timedOut = false;
    }
}

}

  1. reject final void reject(Runnable command) { handler.rejectedExecution(command, this); }

2.1) AbortPolicy#rejectedExecution public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }

2.2) DiscardPolicy#rejectedExecution public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }

2.3) DiscardOldestPolicy#rejectedExecution public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }

2.4) CallerRunsPolicy#rejectedExecution public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }

submit(Callable 包装为 FutureTask) public <T> Future <T> submit(Callable <T> task) { if (task == null) throw new NullPointerException(); RunnableFuture <T> ftask = newTaskFor(task); execute(ftask); return ftask; }

  1. newTaskFor(将 Callable 包装成 Runnable+Future,Runnable 可以放在 ThreadPoolExecutor 中执行) protected <T> RunnableFuture <T> newTaskFor(Callable <T> callable) { return new FutureTask <T>(callable); }

public FutureTask(Callable <V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }

1.1) FutureTask#run public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable <V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 1.1.1) FutureTask#set protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } 1.1.2) FutureTask#setException protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }

1.1.1.1) FutureTask#finishCompletion(唤醒 Waiter) private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;😉 { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }

done();

callable = null;        // to reduce footprint

}

  1. FutureTask#get public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 如果尚未执行完毕,则等待 s = awaitDone(false, 0L); return report(s); }

/**

  • Simple linked list nodes to record waiting threads in a Treiber

  • stack. See other classes such as Phaser and SynchronousQueue

  • for more detailed explanation. */ static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }

    2.1) FutureTask#awaitDone(添加并阻塞 Waiter) private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;😉 { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }

    int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued)

    // 添加 Waiter,以待被唤醒 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }

    2.2) FutureTask#report(抛出执行时的异常) private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

  1. FutureTask#cancel(实际上还是中断线程) public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }

关闭线程池 shutdown 方法会遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。 shutdownNow 方法会首先将线程池的状态设置为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置为 SHUTDOWN 状态,然后中断所有没有在执行任务的线程。 只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都关闭后,才表示线程池关闭成功,这时调用 isTerminated 方法会返回 true。 shutdown

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 判断是否可以操作目标线程 checkShutdownAccess(); // 设置线程池状态为 SHUTDOWN,以后线程池不会执行新的任务 advanceRunState(SHUTDOWN); // 中断所有的空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 转到 Terminate tryTerminate(); }

  1. interruptIdleWorkers private void interruptIdleWorkers() { interruptIdleWorkers(false); } 中断 worker,但是中断之前需要先获取锁,这就意味着正在运行的 Worker 不能中断。但是上面的代码有 w.tryLock(),那么获取不到锁就不会中断,shutdown 的 Interrupt 只是对所有的空闲 Worker(正在从 workQueue 中取 Task,此时 Worker 没有加锁)发送中断信号。 private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { //tryLock 能获取到的 Worker 都是空闲的 Worker,因为 Worker 在执行任务时是要拿到 Worker 的 Lock 的 try { // 让阻塞在工作队列中的 Worker 中断 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }

shutdownNow public List <Runnable> shutdownNow() { List <Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); // 工作队列中没有执行的任务全部抛弃 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }

  1. interruptWorkers private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }

Worker#interruptIfStarted void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }

1.1) drainQueue */ private List <Runnable> drainQueue() { BlockingQueue <Runnable> q = workQueue; ArrayList <Runnable> taskList = new ArrayList <Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }

1.2) tryTerminate(两种关闭都会调用)TIDYING 和 TERMINATED 的转化 有几种状态是不能转化到 TIDYING(整理中)的: RUNNING 状态 TIDYING 或 TERMINATED SHUTDOWN 状态,但是 workQueue 不为空

也说明了两点:

  1. SHUTDOWN 想转化为 TIDYING,需要 workQueue 为空,同时 workerCount 为 0。

  2. STOP 转化为 TIDYING,需要 workerCount 为 0 如果满足上面的条件(一般一定时间后都会满足的),那么 CAS 成 TIDYING,TIDYING 也只是个过渡状态,最终会转化为 TERMINATED。 final void tryTerminate() { for (;😉 { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; }

    final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }

ScheduledThreadPoolExecutor(继承 ThreadPoolExecutor) 与 ThreadPoolExecutor 的区别: 1)使用 DelayedWorkQueue 作为任务队列 2)获取任务的方式不同 3)执行周期任务后,增加了额外的处理 成员变量 /**

  • False if should cancel/suppress periodic tasks on shutdown. */ private volatile boolean continueExistingPeriodicTasksAfterShutdown;

/**

  • False if should cancel non-periodic tasks on shutdown. */ private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

/**

  • True if ScheduledFutureTask.cancel should remove from queue */ private volatile boolean removeOnCancel = false;

/**

  • Sequence number to break scheduling ties, and in turn to
  • guarantee FIFO order among tied entries. */ private static final AtomicLong sequencer = new AtomicLong();

构造方法 /**

  • Creates a new {@code ScheduledThreadPoolExecutor} with the
  • given core pool size.
  • @param corePoolSize the number of threads to keep in the pool, even
  • if they are idle, unless {@code allowCoreThreadTimeOut} is set
  • @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }

/**

  • Creates a new {@code ScheduledThreadPoolExecutor} with the
  • given initial parameters.
  • @param corePoolSize the number of threads to keep in the pool, even
  • if they are idle, unless {@code allowCoreThreadTimeOut} is set
  • @param threadFactory the factory to use when the executor
  • creates a new thread
  • @throws IllegalArgumentException if {@code corePoolSize < 0}
  • @throws NullPointerException if {@code threadFactory} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }

/**

  • Creates a new ScheduledThreadPoolExecutor with the given
  • initial parameters.
  • @param corePoolSize the number of threads to keep in the pool, even
  • if they are idle, unless {@code allowCoreThreadTimeOut} is set
  • @param handler the handler to use when execution is blocked
  • because the thread bounds and queue capacities are reached
  • @throws IllegalArgumentException if {@code corePoolSize < 0}
  • @throws NullPointerException if {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); }

/**

  • Creates a new ScheduledThreadPoolExecutor with the given
  • initial parameters.
  • @param corePoolSize the number of threads to keep in the pool, even
  • if they are idle, unless {@code allowCoreThreadTimeOut} is set
  • @param threadFactory the factory to use when the executor
  • creates a new thread
  • @param handler the handler to use when execution is blocked
  • because the thread bounds and queue capacities are reached
  • @throws IllegalArgumentException if {@code corePoolSize < 0}
  • @throws NullPointerException if {@code threadFactory} or
  • {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }

DelayedWorkQueue(底层是堆,无界阻塞队列,存放 RunnableScheduledFuture) static class DelayedWorkQueue extends AbstractQueue <Runnable> implements BlockingQueue <Runnable> {}

put public void put(Runnable e) { offer(e); }

public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture <?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }

private void siftUp(int k, RunnableScheduledFuture <?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }

ScheduledFutureTask 是 RunnableScheduledFuture 的唯一实现类,它实现了 Comparable 接口。先按照 time(下一次运行的时间)比较,然后按 seq 比较,最后按 delay 比较。 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask <?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }

take(阻塞获取) public RunnableScheduledFuture <?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) // 队列为空,则阻塞 available.await(); else { long delay = first.getDelay(NANOSECONDS); // 已经过期,则移除 if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 未过期,则等待相应时间 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }

定期执行任务 执行主要分为两大部分: 1)当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate()方法或 scheduleAtFixedDelay()方法时,会向 DelayedWorkQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask。 2)线程池中的线程从 DelayedWorkQueue 中获取 ScheduledFutureTask,然后执行任务。

scheduleAtFixedRate(Runnable 包装为 ScheduledFutureTask) public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 将 Runnable 包装成 ScheduledFutureTask,它实现了 RunnableScheduledFuture 接口 ScheduledFutureTask <Void> sft = new ScheduledFutureTask <Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture <Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }

  1. ScheduledFutureTask private class ScheduledFutureTask <V> extends FutureTask <V> implements RunnableScheduledFuture <V> {} 成员变量 /*_ Sequence number to break ties FIFO _/ private final long sequenceNumber;

/*_ The time the task is enabled to execute in nanoTime units _/ private long time;

/**

  • Period in nanoseconds for repeating tasks. A positive
  • value indicates fixed-rate execution. A negative value
  • indicates fixed-delay execution. A value of 0 indicates a
  • non-repeating task. */ private final long period;

/*_ The actual task to be re-enqueued by reExecutePeriodic _/ RunnableScheduledFuture <V> outerTask = this;

/**

  • Index into delay queue, to support faster cancellation. */ int heapIndex; 构造方法 ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); }

public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }

public static <T> Callable <T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter <T>(task, result); }

static final class RunnableAdapter <T> implements Callable <T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }

  1. delayedExecute(入队) private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { // 加入到任务队列中 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 确保至少一个线程在处理任务,即使核心线程数 corePoolSize 为 0 ensurePrestart(); } } 2.1) ensurePrestart(添加工作线程直至 corePoolSize) void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
  2. ScheduledFutureTask#run public void run() { // 判断是不是定时周期调度的 boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) //调用 FutureTask 的 run 方法 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { //计算下一次执行时间 setNextRunTime(); // 重新入队 reExecutePeriodic(outerTask); } }

private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }

void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }

练习题 生产者消费者几种实现方式

wait&notify public class TestProducerConsumer { public static void main(String[] args) { SyncStack ss = new SyncStack(); Producer p = new Producer(ss); Consumer c = new Consumer(ss); new Thread(p, "A").start(); new Thread(p, "B").start(); new Thread(c).start(); } }

class Food { private String id;

public Food(String id) {
	this.id = id;
}

public String toString() {
	return "产品" + id;
}

}

class SyncStack { private int index = 0; private Food[] foods = new Food[6];

public SyncStack() {
}

public synchronized void push(Food f) {
	while (index == foods.length) {
		try {
			System.out.println("容器已满");
			this.wait();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	foods[index] = f;
	index++;
	this.notifyAll();
}

public synchronized Food pop() {
	while (index == 0) {
		try {
			System.out.println("容器已空");
			this.wait();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	index--;
	this.notifyAll();
	return foods[index];
}

}

class Producer implements Runnable { private SyncStack ss;

public Producer(SyncStack ss) {
	this.ss = ss;
}

public void run() {
	for (int i = 0; i < 10; i++) {
		Food f = new Food(Thread.currentThread().getName() + i);
		ss.push(f);
		System.out.println("生产者"+Thread.currentThread().getName() + "生产了 " + f);
		try {
			Thread.sleep((int) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

}

class Consumer implements Runnable { private SyncStack ss;

public Consumer(SyncStack ss) {
	this.ss = ss;
}

public void run() {
	for (int i = 0; i < 20; i++) {
		Food f = ss.pop();
		System.out.println("消费了 " + f);
		try {
			Thread.sleep((int) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

}

Lock&Condition public class TestProducerConsumer { public static void main(String[] args) { SyncStack ss = new SyncStack(); Producer p = new Producer(ss); Consumer c = new Consumer(ss); new Thread(p, "A").start(); new Thread(p, "B").start(); new Thread(c, "C").start(); new Thread(c, "D").start(); } }

class Food { private String id;

public Food(String id) {
	this.id = id;
}

public String toString() {
	return "产品" + id;
}

}

class SyncStack { private int index = 0; private Food[] foods = new Food[6]; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition();

public SyncStack() {
}

public void push(Food f) {
	lock.lock();
	try {
		while (index == foods.length) {
			try {
				System.out.println("容器已满");
				condition.await();
				//相当于this.wait()
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		foods[index] = f;
		index++;
		condition.signalAll();
		//相当于this.notifyAll()
	} finally {
		lock.unlock();
	}
}

public Food pop() {
	lock.lock();
	try {
		while (index == 0) {
			try {
				System.out.println("容器已空");
				condition.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		index--;
		condition.signalAll();
	} finally {
		lock.unlock();
	}
	return foods[index];
}

}

class Producer implements Runnable { private SyncStack ss;

public Producer(SyncStack ss) {
	this.ss = ss;
}

public void run() {
	for (int i = 0; i < 10; i++) {
		Food f = new Food(Thread.currentThread().getName() + i);
		ss.push(f);
		System.out.println("生产者" + Thread.currentThread().getName() + "生产了 " + f);
		try {
			Thread.sleep((int) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

}

class Consumer implements Runnable { private SyncStack ss;

public Consumer(SyncStack ss) {
	this.ss = ss;
}

public void run() {
	for (int i = 0; i < 10; i++) {
		Food f = ss.pop();
		System.out.println("消费者" + Thread.currentThread().getName() + "消费了 " + f);
		try {
			Thread.sleep((int) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

}

BlockingQueue public class TestProducerConsumer { public static void main(String[] args) { BlockingQueue <Food> queue = new ArrayBlockingQueue <Food>(6); Producer p = new Producer(queue); Consumer c = new Consumer(queue); new Thread(p, "A").start(); new Thread(p, "B").start(); new Thread(c, "C").start(); new Thread(c, "D").start(); } }

class Food { private String id;

public Food(String id) {
	this.id = id;
}

public String toString() {
	return "产品" + id;
}

}

class Producer implements Runnable { private BlockingQueue <Food> foods;

public Producer(BlockingQueue`<Food>` foods) {
	this.foods = foods;
}

public void run() {
	for (int i = 0; i < 10; i++) {
		Food f = new Food(Thread.currentThread().getName() + i);
		try {
			foods.put(f);
			System.out.println("生产者" + Thread.currentThread().getName() + "生产了 " + f);
			Thread.sleep((int) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

}

class Consumer implements Runnable {

private BlockingQueue`<Food>` foods;

public Consumer(BlockingQueue`<Food>` foods) {
	this.foods = foods;
}

public void run() {
	for (int i = 0; i < 10; i++) {
		try {
			Food f = foods.take();
			System.out.println("消费者" + Thread.currentThread().getName() + "消费了 " + f);
			Thread.sleep((int) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

}

线程按序交替执行 设置 3 个线程,线程名分别为 123,按照 123 的顺序打印,重复 20 遍。 public class TestAlternate { public static void main(String[] args) { int threadNum = 3; int loopTimes = 20; AlternativeDemo atomicDemo = new AlternativeDemo(threadNum, loopTimes); for (int i = 1; i <= threadNum; ++i) { new Thread(atomicDemo, String.valueOf(i)).start(); } } // 所有线程共享 lock 和 conditions private static class AlternativeDemo implements Runnable { private int nextThread = 1; private Lock lock = new ReentrantLock(); private Condition[] conditions; private int totalTimes;

public AlternativeDemo(int threadNum, int totalTimes) {
        this.totalTimes = totalTimes;
        this.conditions = new Condition[threadNum];
        for (int i = 0; i < threadNum; ++i) {
            conditions[i] = lock.newCondition();
        }
    }

public void run() {
        for (int i = 1; i <= totalTimes; ++i) {
            lock.lock();
            // currentThread 取值为1,2,3
            // currentThread-1为当前线程对应的Condition
            int currentThread = Thread.currentThread().getName().charAt(0) - '0';
            try {

// 下一个不是自己,则等待 if (currentThread != nextThread) { conditions[currentThread - 1].await(); } System.out.println("线程" + currentThread + ":" + currentThread); // 计算下一个要打印的线程 // 3 % 3 + 1 = 1 线程 3 后面的是线程 1 nextThread = nextThread % conditions.length + 1; // 唤醒下一个要打印的线程 conditions[nextThread - 1].signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } }

线程同步的基本使用习题 public class Test8Questions {

public static void main(String[] args) {
	Number number = new Number();
	Number number2 = new Number();

new Thread(new Runnable() {
		@Override
		public void run() {
			number.getOne();
		}
	}).start();

new Thread(new Runnable() {
		@Override
		public void run() {

// number.getTwo(); number2.getTwo(); } }).start();

/*new Thread(new Runnable() {
		@Override
		public void run() {
			number.getThree();
		}
	}).start();*/

}

}

class Number{

public static synchronized void getOne(){//Number.class
	try {
		Thread.sleep(3000);
	} catch (InterruptedException e) {
	}

System.out.println("one");
}

public synchronized void getTwo(){//this
	System.out.println("two");
}

public void getThree(){
	System.out.println("three");
}

}

1、持有同一个 Number 对象,并都加了锁,按调用顺序打印 2、sleep 方法不会释放锁 3、普通方法不需要锁定对象,直接调用,最先调用 4、两个 Number 对象,实际上并没有并发访问资源 5、静态同步方法锁定的是类的 Class 对象,非静态同步方法锁定的是类的实例,同 4,没有并发访问资源 6、两个方法均为静态同步方法,此时构成并发访问,因为它们锁定的是类的同一个 Class 对象 7、同 4,没有并发访问资源 8、同 6,虽然是不同实例,但对应着同一个 Class 对象

直击灵魂的 Interrupt 七问 1.Thread.interrupt()方法和 InterruptedException 异常的关系?是由 interrupt 触发产生了 InterruptedException 异常? 2.Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING? 3.一般 Thread 编程需要关注 interrupt 中断不?一般怎么处理?可以用来做什么? 4.LockSupport.park()和 unpark(),与 object.wait()和 notify()的区别? 5.LockSupport.park(Object blocker)传递的 blocker 对象做什么用? 6.LockSupport 能响应 Thread.interrupt()事件不?会抛出 InterruptedException 异常? 7.Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?

  1. Thread.interrupt()只是在 Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出 InterruptedException 异常。而在其他的 block 场景,只是通过设置了 Thread 的一个标志位信息,需要程序自己进行处理。 在 J.U.C 里面的 ReentrantLock、Condition 等源码都是自己去检测中断标志位,然后抛出 InterruptedException。 if (Thread.interrupted()) // Clears interrupted status!throw new InterruptedException();
  2. Thread.interrupt 设计的目的主要是用于处理线程处于 block 状态,比如 wait(),sleep()状态就是个例子。但可以在程序设计时为支持 task cancel,同样可以支持 RUNNING 状态。比如 Object.join()和一些支持 interrupt 的一些 nio channel 设计。
  3. interrupt 用途: unBlock 操作,支持任务 cancel, 数据清理等。
  1. 面向的主体不一样。LockSuport 主要是针对 Thread 进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。
  2. 实现机制不同。虽然 LockSuport 可以指定 monitor 的 object 对象,但和 object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒 LockSupport 的阻塞 Thread.
  1. 对应的 blcoker 会记录在 Thread 的一个 parkBlocker 属性中,通过 jstack 命令可以非常方便的监控具体的阻塞对象.
  2. 能响应 interrupt 事件,但不会抛出 InterruptedException 异常
上次更新: 2023/08/06, 22:51:57
Java基础
Spring源码解析

← Java基础 Spring源码解析→

最近更新
01
MySQL开发规范及慢查询优化
08-25
02
linux增加swap交换空间
08-16
03
uni-app云打包Android Apk
08-13
更多文章>
| Copyright © 2022-2023 yolofyi.com - All rights reserved | 鄂ICP备2022003053号 |
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式