ThreadPoolExecutor 和 CompletableFuture 实践
池化思想,维护一个池子,减少创建/销毁的开销,合理分配系统资源,但会增加调度开销。参考美团技术博客:底层设计思想、业务实现
ThreadPoolExecutor 线程池实现类
执行流程图
线程池生命周期
任务调度流程
所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
- 首先检测线程池运行状态,如果不是
RUNNING
,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。 - 如果
workerCount < corePoolSize
,则创建并启动一个线程来执行新提交的任务。 - 如果
workerCount >= corePoolSize,且线程池内的阻塞队列未满
,则将任务添加到该阻塞队列中。 - 如果
workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满
,则创建并启动一个线程来执行新提交的任务。 - 如果
workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满
, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
底层结构
ThreadPoolExecutor 构造函数
public ThreadPoolExecutor( |
核心线程数:CPU 密集型任务(N+1) I/O 密集型任务(2N) CPU 核心数 N
阻塞队列 BlockingQueue
阻塞队列中保存即将运行的任务,BlockingQueue 与 Queue 的主要区别是:
- 通过在入队和出队时进行加锁,保证了队列线程安全
- 支持阻塞的入队和出队方法:当队列满时,会阻塞入队的线程,直到队列不满;当队列为空时,会阻塞出队的线程,直到队列中有元素。
底层实现
- 采用 ReentrantLock 同步队列实现加锁解锁、阻塞释放
- 队列为空或已满的情况,采用条件队列 Condition
阻塞队列 | 描述 |
---|---|
ArrayBlockingQueue | 有界队列 |
LinkedBlockingQueue | 不设置大小的话,默认无界队列 |
SynchronousQueue | 如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务 |
DelayedWorkQueue | 内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序 |
PriorityBlockingQueue | 优先级阻塞队列 |
创建线程工厂类 ThreadFactory
参数可以设定线程名
- java rt包:原生 ThreadFactory 类
- hutool: ThreadFactoryBuilder
- google guava.jar: ThreadFactoryBuilder
自定义线程工厂
public static class testThreadPoolFactory implements ThreadFactory { |
任务拒绝策略
在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会拒绝,具体的拒绝策略可以设定
拒绝策略 | 描述 |
---|---|
AbortPolicy | 线程池队列满了丢掉这个任务并且抛出 RejectedExecutionException 异常 |
DiscardPolicy | 如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常 |
DiscardOldestPolicy | 如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列 |
CallerRunsPolicy | 如果添加到线程池失败,那么主线程会自己去执行该任务 |
自定义拒绝策略
public class MyRejectPolicy implements RejectedExecutionHandler{ |
线程池实现接口
ExecutorService接口:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future 的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。
public interface ExecutorService extends Executor { |
顶层接口 Executor :将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
public interface Executor { |
Executors 线程池工厂类
FixedThreadPool 固定线程数
运行中的 FixedThreadPool(未执行 shutdown()或 shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)
public static ExecutorService newFixedThreadPool(int nThreads) { |
SingleThreadExecutor 只有一个线程的线程池(FixedThreadPool 的特例)
CachedThreadPool 根据需要创建新线程的线程池
CompletableFuture 异步执行结果类
RxJava、Reactor 比 CompletableFuture 增加了操作融合、延迟执行、回压等特性
CompletableFuture实现 Future、CompletionStage 接口。
- Future 表示异步计算的结果,可以通过
Future
类获取到耗时任务的执行结果 - CompletionStage 用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个 CompletionStage 触发的,随着当前步骤的完成,也可能会触发其他一系列 CompletionStage 的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage 接口正是定义了这样的能力,我们可以通过其提供的 thenAppy、thenCompose 等函数式编程方法来组合编排这些步骤。
FutureTask
相当于对Callable
进行了封装,管理着任务执行的情况,存储了 Callable
的 call
方法的任务执行结果
注意事项
- Future 需要获取返回值,才能获取异常信息(即线程需要实现
Callable
接口,而Runnable
接口创建线程不会返回结果或抛出检查异常) - CompletableFuture 的 get() 方法是阻塞等待的
- 异步调用强制传自定义线程池,做好线程池隔离
- 自定义线程池时,仔细考虑延迟队列满时,应采取什么拒绝策略
Q1:程序执行在哪个线程上?
A1:
同步方法(即不带Async后缀的方法)有两种情况。
-
如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
-
如果注册时被依赖的操作还未执行完,则由回调线程(按照当前 CF 运行情况区分)执行。
异步方法(即带Async后缀的方法):可以选择是否传递线程池参数Executor运行在指定线程池中;当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool
Q2:同步回调和异步回调的区别
A2:
同步回调:把函数 b 传递给函数 a。执行 a 的时候,回调了 b,a 要一直等到 b 执行完才能继续执行;
异步回调:把函数 b 传递给函数 a。执行 a 的时候,回调了 b,然后 a 就继续往后执行,b 独自执行。
线程池循环引用会导致死锁
public Object doGet() { |
threadPool1 大小为 10,当同一时刻有 10 个请求到达,则 threadPool1 被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行 cf1.join() 进入阻塞状态,并且永远无法恢复。
异步RPC调用注意不要阻塞IO线程池
如果是使用基于 NIO(比如Netty)的异步 RPC,则返回结果是由 IO 线程负责设置的,即回调方法由 IO 线程触发,CompletableFuture 同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步 RPC 调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。
实际使用场景
服务端和客户端信息交换频繁,提升系统吞吐量
I/O密集型任务,对内调度各个下游服务获取数据进行聚合
对业务流程进行编排,降低任务依赖导致的阻塞
避免 guava 的 ListenableFuture 回调所导致的回调地狱
零依赖-构造方法
ExecutorService executor = Executors.newFixedThreadPool(5); |
一元依赖-异步回调
CompletableFuture<String> cf3 = cf1.thenApply(result1 -> { |
二元依赖-多任务
CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> { |
多元依赖
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5); |
使用方法
构造方法
// 使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务 |
获取结果方法
public T get() |
异步回调方法
// 执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池 |
异常处理方法
由于异步执行的任务在其他线程上执行,而异常信息存储在线程栈中,因此当前线程除非阻塞等待返回结果,否则无法通过try\catch捕获异常。CompletableFuture提供了异常捕获回调exceptionally,相当于同步调用中的try\catch。使用方法如下所示:
|
当 CompletableFuture 的任务不论是正常完成还是出现异常它都会调用 whenComplete
这回调函数。
- 正常完成:
whenComplete
返回结果和上级任务一致,异常为 null; - 出现异常:
whenComplete
返回结果为 null,异常为上级任务的异常;
多任务组合回调方法
thenCompose 可以用于组合多个CompletableFuture,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着业务逻辑上的先后顺序,前后线程执行是同步的,前面阻塞后面不会执行。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); |
thenCombine 第一个入参 CompletionStage 是异步的,合并结果的BiFunction是同步的。
public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); |
applyToEither方法返回一个新的CompletionStage,当此阶段或另一个给定阶段正常完成时,将使用相应的结果作为所提供函数的参数来执行。
public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn); |
底层代码
result 用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。
观察者模式:依赖动作(Dependency Action)都封装在一个单独Completion子类中。CompletableFuture 中的每个方法都对应了图中的一个 Completion 的子类,Completion本身是观察者的基类。
一元依赖中的 thenApply 为例,举例“观察者模式”的设计思想
被观察者
- 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。
- 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在上面的例子中对应步骤fn1的执行结果。
观察者
CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数 f,生成一个 Completion 类型的对象(即观察者),并将入参函数 f 赋值给 Completion 的成员变量 fn,然后检查当前 CF 是否已处于完成状态(即result != null),如果已完成直接触发 fn,否则将观察者 Completion 加入到 CF 的观察者链 stack 中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。
- 观察者中的 dep 属性:指向其对应的 CompletableFuture,在上面的例子中 dep 指向 CF2。
- 观察者中的 src 属性:指向其依赖的 CompletableFuture,在上面的例子中 src 指向 CF1。
- 观察者 Completion 中的 fn 属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。
Q1:为什么要在入栈前和入栈后都检查result == null
A1:因为查 result 和判断 result==null 是两个操作,CompletableFuture 没有对其加锁,若被观察者在这两个操作数之间完成,会导致观察者得不到通知
Q2:当依赖多个CF时,观察者会被压入所有依赖的CF的栈中,每个CF完成的时候都会进行,那么会不会导致一个操作被多次执行呢 ?如下图所示,即当CF1、CF2同时完成时,如何避免CF3被多次触发。
A2:CompletableFuture的实现是这样解决该问题的:观察者在执行之前会先通过CAS操作设置一个状态位,将status由0改为1。如果观察者已经执行过了,那么CAS操作将会失败,取消执行。
CompletableFuture 处理并行问题的整体流程图:
二元依赖
thenCombine 操作表示依赖两个 CompletableFuture。其观察者实现类为 BiApply,如上图所示,BiApply 通过 src 和 snd 两个属性关联被依赖的两个 CF,fn 属性的类型为 BiFunction。与单个依赖不同的是,在依赖的CF未完成的情况下,thenCombine 会尝试将 BiApply 压入这两个被依赖的 CF 的栈中,每个被依赖的 CF 成时都会尝试触发观察者 BiApply,BiApply 会检查两个依赖是否都完成,如果完成则开始执行。这里为了解决重复触发的问题,同样用的是上一章节提到的 CAS 操作,执行时会先通过 CAS 设置状态位,避免重复触发。
// 状态位 Status 定义于 ForkJoinTask 类中 |
多元依赖
依赖多个CompletableFuture的回调方法包括allOf
、anyOf
,区别在于allOf
观察者实现类为BiRelay,需要所有被依赖的CF完成后才会执行回调;而anyOf
观察者实现类为OrRelay,任意一个被依赖的CF完成后就会触发。二者的实现方式都是将多个被依赖的CF构建成一棵平衡二叉树,执行结果层层通知,直到根节点,触发回调监听。