ThreadPoolExecutor 和 CompletableFuture 实践

池化思想,维护一个池子,减少创建/销毁的开销,合理分配系统资源,但会增加调度开销。参考美团技术博客:底层设计思想、业务实现

ThreadPoolExecutor 线程池实现类

执行流程图

线程池生命周期

任务调度流程

所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

底层结构

ThreadPoolExecutor 构造函数

public ThreadPoolExecutor(
int corePoolSize, //线程池的核心线程数量
int maximumPoolSize, //线程池的最大线程数
long keepAliveTime, //当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue, //任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory, //线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler //拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {...}

核心线程数:CPU 密集型任务(N+1) I/O 密集型任务(2N) CPU 核心数 N

阻塞队列 BlockingQueue

Link

阻塞队列中保存即将运行的任务,BlockingQueue 与 Queue 的主要区别是:

  • 通过在入队和出队时进行加锁,保证了队列线程安全
  • 支持阻塞的入队和出队方法:当队列满时,会阻塞入队的线程,直到队列不满;当队列为空时,会阻塞出队的线程,直到队列中有元素。

底层实现

  • 采用 ReentrantLock 同步队列实现加锁解锁、阻塞释放
  • 队列为空或已满的情况,采用条件队列 Condition
阻塞队列 描述
ArrayBlockingQueue 有界队列
LinkedBlockingQueue 不设置大小的话,默认无界队列
SynchronousQueue 如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务
DelayedWorkQueue 内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序
PriorityBlockingQueue 优先级阻塞队列

创建线程工厂类 ThreadFactory

参数可以设定线程名

  • java rt包:原生 ThreadFactory 类
  • hutool: ThreadFactoryBuilder
  • google guava.jar: ThreadFactoryBuilder

自定义线程工厂

public static class testThreadPoolFactory implements ThreadFactory {

private AtomicInteger threadIdx = new AtomicInteger(0);

private String threadNamePrefix;

public testThreadPoolFactory(String Prefix) {
threadNamePrefix = Prefix;
}

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(threadNamePrefix + "-xxljob-" + threadIdx.getAndIncrement());
return thread;
}
}

任务拒绝策略

在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会拒绝,具体的拒绝策略可以设定

拒绝策略 描述
AbortPolicy 线程池队列满了丢掉这个任务并且抛出 RejectedExecutionException 异常
DiscardPolicy 如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
DiscardOldestPolicy 如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
CallerRunsPolicy 如果添加到线程池失败,那么主线程会自己去执行该任务

自定义拒绝策略

public class MyRejectPolicy implements RejectedExecutionHandler{
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//Sender是我的Runnable类,里面有message字段
if (r instanceof Sender) {
Sender sender = (Sender) r;
//直接打印
System.out.println(sender.getMessage());
}
}
}

线程池实现接口

ExecutorService接口:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future 的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。

public interface ExecutorService extends Executor {  

//不再接受新任务,待所有任务执行完毕后关闭 ExecutorService
void shutdown();

//不再接受新任务,直接关闭 ExecutorService,返回没有执行的任务列表
List<Runnable> shutdownNow();

//判断ExecutorService是否关闭
boolean isShutdown();

//判断ExecutorService是否终止
boolean isTerminated();

//等待ExecutorService到达终止状态
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

//当task执行成功的时候future.get()返回执行结果
<T> Future<T> submit(Callable<T> task);

//当task执行成功的时候future.get()返回result
<T> Future<T> submit(Runnable task, T result);

//当task执行成功的时候future.get()返回null
Future<?> submit(Runnable task);

//批量提交任务并获得他们的future,Task列表与Future列表一一对应
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

//批量提交任务并获得一个已经成功执行的任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

顶层接口 Executor :将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

public interface Executor {
// 在未来的某个时间执行 command (方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否)
void execute(Runnable command);
}

Executors 线程池工厂类

FixedThreadPool 固定线程数

运行中的 FixedThreadPool(未执行 shutdown()或 shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

SingleThreadExecutor 只有一个线程的线程池(FixedThreadPool 的特例)

CachedThreadPool 根据需要创建新线程的线程池

CompletableFuture 异步执行结果类

RxJava、Reactor 比 CompletableFuture 增加了操作融合、延迟执行、回压等特性

CompletableFuture实现 Future、CompletionStage 接口。

  • Future 表示异步计算的结果,可以通过Future 类获取到耗时任务的执行结果
  • CompletionStage 用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个 CompletionStage 触发的,随着当前步骤的完成,也可能会触发其他一系列 CompletionStage 的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage 接口正是定义了这样的能力,我们可以通过其提供的 thenAppy、thenCompose 等函数式编程方法来组合编排这些步骤。

FutureTask相当于对Callable 进行了封装,管理着任务执行的情况,存储了 Callablecall 方法的任务执行结果

注意事项

  • Future 需要获取返回值,才能获取异常信息(即线程需要实现 Callable 接口,而 Runnable 接口创建线程不会返回结果或抛出检查异常)
  • CompletableFuture 的 get() 方法是阻塞等待的
  • 异步调用强制传自定义线程池,做好线程池隔离
  • 自定义线程池时,仔细考虑延迟队列满时,应采取什么拒绝策略

Q1:程序执行在哪个线程上?

A1

同步方法(即不带Async后缀的方法)有两种情况。

  • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。

  • 如果注册时被依赖的操作还未执行完,则由回调线程(按照当前 CF 运行情况区分)执行。

异步方法(即带Async后缀的方法):可以选择是否传递线程池参数Executor运行在指定线程池中;当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool

Q2:同步回调和异步回调的区别

A2

同步回调:把函数 b 传递给函数 a。执行 a 的时候,回调了 b,a 要一直等到 b 执行完才能继续执行;

异步回调:把函数 b 传递给函数 a。执行 a 的时候,回调了 b,然后 a 就继续往后执行,b 独自执行。

线程池循环引用会导致死锁

public Object doGet() {
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
//do sth
return CompletableFuture.supplyAsync(() -> {
System.out.println("child");
return "child";
}, threadPool1).join();//子任务
}, threadPool1);
return cf1.join();
}

threadPool1 大小为 10,当同一时刻有 10 个请求到达,则 threadPool1 被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行 cf1.join() 进入阻塞状态,并且永远无法恢复。

异步RPC调用注意不要阻塞IO线程池

如果是使用基于 NIO(比如Netty)的异步 RPC,则返回结果是由 IO 线程负责设置的,即回调方法由 IO 线程触发,CompletableFuture 同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步 RPC 调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。

实际使用场景

美团技术博客

服务端和客户端信息交换频繁,提升系统吞吐量

I/O密集型任务,对内调度各个下游服务获取数据进行聚合

对业务流程进行编排,降低任务依赖导致的阻塞

避免 guava 的 ListenableFuture 回调所导致的回调地狱

零依赖-构造方法

ExecutorService executor = Executors.newFixedThreadPool(5);
//1、使用runAsync或supplyAsync发起异步调用
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
return "result1";
}, executor);
//2、CompletableFuture.completedFuture()直接创建一个已完成状态的CompletableFuture
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");
//3、先初始化一个未完成的CompletableFuture,然后通过complete()、completeExceptionally(),完成该CompletableFuture
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("success");

一元依赖-异步回调

CompletableFuture<String> cf3 = cf1.thenApply(result1 -> {
//result1为CF1的结果
//......
return "result3";
});
CompletableFuture<String> cf5 = cf2.thenApply(result2 -> {
//result2为CF2的结果
//......
return "result5";
});

二元依赖-多任务

CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> {
//result1和result2分别为cf1和cf2的结果
return "result4";
});

多元依赖

CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
CompletableFuture<String> result = cf6.thenApply(v -> {
//这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全部完成时,才会执行 。
result3 = cf3.join();
result4 = cf4.join();
result5 = cf5.join();
//根据result3、result4、result5组装最终result;
return "result";
});

使用方法

方法汇总

构造方法

// 使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

// 自定义线程池,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

// 使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable)

// 自定义线程池,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

获取结果方法

public T get()  
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
// 不抛出异常
public T join()

异步回调方法

// 执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
public CompletableFuture<Void> thenRunAsync(Runnable action);

// 执行第二个任务时,使用给定的线程池
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

// 第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

// 执行第二个回调方法任务,会将第一个任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

异常处理方法

由于异步执行的任务在其他线程上执行,而异常信息存储在线程栈中,因此当前线程除非阻塞等待返回结果,否则无法通过try\catch捕获异常。CompletableFuture提供了异常捕获回调exceptionally,相当于同步调用中的try\catch。使用方法如下所示:

@Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//内部接口
public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {
CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务方法,内部会发起异步rpc调用
return remarkResultFuture
.thenApply(result -> {//这里增加了一个回调方法thenApply,如果发生异常thenApply内部会通过new CompletionException(throwable) 对异常进行包装
//这里是一些业务操作
})
.exceptionally(err -> {//通过exceptionally 捕获异常,这里的err已经被thenApply包装过,因此需要通过Throwable.getCause()提取异常
log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, ExceptionUtils.extractRealException(err));
return 0;
});
}

public class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
//这里判断异常类型是否为CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}

当 CompletableFuture 的任务不论是正常完成还是出现异常它都会调用 whenComplete 这回调函数。

  • 正常完成:whenComplete 返回结果和上级任务一致,异常为 null;
  • 出现异常:whenComplete 返回结果为 null,异常为上级任务的异常;

多任务组合回调方法

thenCompose 可以用于组合多个CompletableFuture,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着业务逻辑上的先后顺序,前后线程执行是同步的,前面阻塞后面不会执行。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

thenCombine 第一个入参 CompletionStage 是异步的,合并结果的BiFunction是同步的。

public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);

applyToEither方法返回一个新的CompletionStage,当此阶段或另一个给定阶段正常完成时,将使用相应的结果作为所提供函数的参数来执行。

public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn);

底层代码

result 用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。

观察者模式:依赖动作(Dependency Action)都封装在一个单独Completion子类中。CompletableFuture 中的每个方法都对应了图中的一个 Completion 的子类,Completion本身是观察者的基类。

一元依赖中的 thenApply 为例,举例“观察者模式”的设计思想

被观察者

  1. 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。
  2. 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在上面的例子中对应步骤fn1的执行结果。

观察者

CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数 f,生成一个 Completion 类型的对象(即观察者),并将入参函数 f 赋值给 Completion 的成员变量 fn,然后检查当前 CF 是否已处于完成状态(即result != null),如果已完成直接触发 fn,否则将观察者 Completion 加入到 CF 的观察者链 stack 中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。

  1. 观察者中的 dep 属性:指向其对应的 CompletableFuture,在上面的例子中 dep 指向 CF2。
  2. 观察者中的 src 属性:指向其依赖的 CompletableFuture,在上面的例子中 src 指向 CF1。
  3. 观察者 Completion 中的 fn 属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。

Q1:为什么要在入栈前和入栈后都检查result == null

A1:因为查 result 和判断 result==null 是两个操作,CompletableFuture 没有对其加锁,若被观察者在这两个操作数之间完成,会导致观察者得不到通知

Q2:当依赖多个CF时,观察者会被压入所有依赖的CF的栈中,每个CF完成的时候都会进行,那么会不会导致一个操作被多次执行呢 ?如下图所示,即当CF1、CF2同时完成时,如何避免CF3被多次触发。

A2:CompletableFuture的实现是这样解决该问题的:观察者在执行之前会先通过CAS操作设置一个状态位,将status由0改为1。如果观察者已经执行过了,那么CAS操作将会失败,取消执行。

CompletableFuture 处理并行问题的整体流程图:

二元依赖

thenCombine 操作表示依赖两个 CompletableFuture。其观察者实现类为 BiApply,如上图所示,BiApply 通过 src 和 snd 两个属性关联被依赖的两个 CF,fn 属性的类型为 BiFunction。与单个依赖不同的是,在依赖的CF未完成的情况下,thenCombine 会尝试将 BiApply 压入这两个被依赖的 CF 的栈中,每个被依赖的 CF 成时都会尝试触发观察者 BiApply,BiApply 会检查两个依赖是否都完成,如果完成则开始执行。这里为了解决重复触发的问题,同样用的是上一章节提到的 CAS 操作,执行时会先通过 CAS 设置状态位,避免重复触发。

// 状态位 Status 定义于 ForkJoinTask 类中
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask{...}

public abstract class ForkJoinTask<V> implements Future<V>{
volatile int status;
}

多元依赖

依赖多个CompletableFuture的回调方法包括allOfanyOf,区别在于allOf观察者实现类为BiRelay,需要所有被依赖的CF完成后才会执行回调;而anyOf观察者实现类为OrRelay,任意一个被依赖的CF完成后就会触发。二者的实现方式都是将多个被依赖的CF构建成一棵平衡二叉树,执行结果层层通知,直到根节点,触发回调监听。