并发系列(二)——FutureTask类源码简析
背景
本文基于JDK 11,主要介绍FutureTask类中的run()、get()和cancel() 方法,没有过多解析相应interface中的注释,但阅读源码时建议先阅读注释,明白方法的主要的功能,再去看源码会更快。
文中若有不正确的地方欢迎大伙留言指出,谢谢了!
1、FutureTask类图
1.1 FutureTask简介
FutureTask类图如下(使用IDEA生成)。如图所示,FutureTask实现了Future接口的所有方法,并且实现了Runnable接口,其中,Runnable接口的现实类用于被线程执行,而Future代表的是异步计算的结果。因此,FutureTask类可以理解为,执行run()(实现Runnable接口中的方法),通过Future的get()方法获取结果。
1.2 FutureTask的属性
- //任务线程总共有七中状态如下:
- * Possible state transitions:
- * NEW -> COMPLETING -> NORMAL
- * NEW -> COMPLETING -> EXCEPTIONAL
- * NEW -> CANCELLED
- * NEW -> INTERRUPTING -> INTERRUPTED
- */
- private volatile int state;
- private static final int NEW = 0;
- private static final int COMPLETING = 1;
- private static final int NORMAL = 2;
- private static final int EXCEPTIONAL = 3;
- private static final int CANCELLED = 4;
- private static final int INTERRUPTING = 5;
- private static final int INTERRUPTED = 6;
- /** The underlying callable; nulled out after running */
- //在run()方法中调用
- private Callable<V> callable;
- /** The result to return or exception to throw from get() */
- //任务执行结果,callable.call()正常执行的返回值
- private Object outcome; // non-volatile, protected by state reads/writes
- /** The thread running the callable; CASed during run() */
- //任务线程
- private volatile Thread runner;
- /** Treiber stack of waiting threads */
- //等待任务结果的线程组成的节点,放在链表对列中
- private volatile WaitNode waiters;
2、源码解析
2.1 run()方法
- public void run() {
- //1、若是任务的状态不是NEW,且使用CAS将runner置为当前线程则直接返回
- if (state != NEW ||
- !RUNNER.compareAndSet(this, null, Thread.currentThread()))
- return;
- try {
- Callable<V> c = callable;
- //2、任务不为null,且state的状态为NEW的情况下才执行任务
- if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- //执行任务并接收执行结果
- result = c.call();
- //正常执行结果则将标识置为true
- ran = true;
- } catch (Throwable ex) {
- //3、任务发生异常,执行或cancel(),则结果置为null,并记录异常信息
- result = null;
- ran = false;
- setException(ex);
- }
- //4、任务正常结束,则设置返回结果
- if (ran)
- set(result);
- }
- } finally {
- // runner must be non-null until state is settled to
- // prevent concurrent calls to run()
- runner = null;
- // state must be re-read after nulling runner to prevent
- // leaked interrupts
- int s = state;
- //5、若是异常导致,走另一个流程
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);
- }
- }
1)若任务的状态不是NEW,或者使用CAS将runner置为当前线程失败,则直接返回的原因是防止多线程调用;
2)再度确认任务执行的前置条件;
3)任务执行异常,将result置为null,并记录异常,setException()源码如下:
- protected void setException(Throwable t) {
- //使用CAS将状态置为中间态COMPLETING
- if (STATE.compareAndSet(this, NEW, COMPLETING)) {
- outcome = t;
- STATE.setRelease(this, EXCEPTIONAL); // final state
- //任务处于结束态时,遍历唤醒等待result的线程
- finishCompletion();
- }
- }
任务的状态变化为NEW – > COMPLETING -> EXCEPTIONAL
4)任务正常结果则会设置result之后,唤醒waitNode的链表对列中等待任务结果的线程;
5)异常后的调用逻辑如下:
- //保证调用cancel在run方法返回之前中断执行任务
- private void handlePossibleCancellationInterrupt(int s) {
- // It is possible for our interrupter to stall before getting a
- // chance to interrupt us. Let's spin-wait patiently.
- if (s == INTERRUPTING)
- //自旋等待
- while (state == INTERRUPTING)
- //当前线程让出CPU执行权
- Thread.yield(); // wait out pending interrupt
- }
2.2 get()方法
源码分析如下:
- public V get() throws InterruptedException, ExecutionException {
- int s = state;
- if (s <= COMPLETING)
- //等待任务完成
- s = awaitDone(false, 0L);
- //返回结果
- return report(s);
- }
其中,等待过程分析如下:
- private int awaitDone(boolean timed, long nanos)
- throws InterruptedException {
- // The code below is very delicate, to achieve these goals:
- // - call nanoTime exactly once for each call to park
- // - if nanos <= 0L, return promptly without allocation or nanoTime
- // - if nanos == Long.MIN_VALUE, don't underflow
- // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
- // and we suffer a spurious wakeup, we will do no worse than
- // to park-spin for a while
- long startTime = 0L; // Special value 0L means not yet parked
- WaitNode q = null;
- boolean queued = false;
- for (;;) {
- int s = state;
- //1、任务的状态已经处于最终的状态,则将任务线程的引用置为null,直接返回状态
- if (s > COMPLETING) {
- if (q != null)
- q.thread = null;
- return s;
- }
- //2、任务的状态为COMPLETING说明任务已经接近完成,则当前线程让出CPU权限以便任务执行线程获取到CPU执行权
- else if (s == COMPLETING)
- // We may have already promised (via isDone) that we are done
- // so never return empty-handed or throw InterruptedException
- Thread.yield();
- //3、当前线程被中断,则将当前线程从等待任务结果的对列中移除,并抛出异常
- else if (Thread.interrupted()) {
- removeWaiter(q);
- throw new InterruptedException();
- }
- //4、任务线程的状态小于COMPLETING,则将当前调用get()方法的线程新建一个Node
- else if (q == null) {
- if (timed && nanos <= 0L)
- return s;
- q = new WaitNode();
- }
- //5、若由当前线程构成的Node未加入链表中,则加入
- else if (!queued)
- queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
- //6、是否开启了超时获取结果
- else if (timed) {
- final long parkNanos;
- if (startTime == 0L) { // first time
- startTime = System.nanoTime();
- if (startTime == 0L)
- startTime = 1L;
- parkNanos = nanos;
- } else {
- long elapsed = System.nanoTime() - startTime;
- //7、超时则从栈中移除当前线程
- if (elapsed >= nanos) {
- removeWaiter(q);
- return state;
- }
- parkNanos = nanos - elapsed;
- }
- // nanoTime may be slow; recheck before parking
- //当前线程挂起
- if (state < COMPLETING)
- LockSupport.parkNanos(this, parkNanos);
- }
- else
- LockSupport.park(this);
- }
- }
获取到返回的状态值后,根据其状态值判断是返回结果还是抛出异常。
2.2 cancel()方法
- public boolean cancel(boolean mayInterruptIfRunning) {
- //1、若任务线程的状态为NEW,则将其状态从NEW置为INTERRUPTING、CANCELLED
- if (!(state == NEW && STATE.compareAndSet
- (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
- //CAS改变任务线程的状态失败,则直接返回false,表示cancel失败
- return false;
- try { // in case call to interrupt throws exception
- //2、改变任务线程的状态成功后,根据是否中断running的任务线程的标识位,决定是否中断正在运行的任务线程
- if (mayInterruptIfRunning) {
- try {
- Thread t = runner;
- //任务线程不为null,则使用interrupt()中断
- if (t != null)
- t.interrupt();
- } finally { // final state
- //设置状态
- STATE.setRelease(this, INTERRUPTED);
- }
- }
- } finally {
- //3、清理等待任务结果的等待线程
- finishCompletion();
- }
- return true;
- }
3、总结
1)执行run()方法,是在调用在Callable的call()方法,其实在初始化时被指定;
2)调用get()方法,若是任务线程还在执行,则会把调用get的线程封装成waitNode塞入到FutureTask类内部的阻塞链表对列中,可以有多个线程同时调用get()方法;
3)cancel()方法是通过对任务线程调用interrupt()实现;