背景

  本文基于JDK 11,主要介绍FutureTask类中的run()、get()和cancel() 方法,没有过多解析相应interface中的注释,但阅读源码时建议先阅读注释,明白方法的主要的功能,再去看源码会更快。

  文中若有不正确的地方欢迎大伙留言指出,谢谢了!

  FutureTask类图如下(使用IDEA生成)。如图所示,FutureTask实现了Future接口的所有方法,并且实现了Runnable接口,其中,Runnable接口的现实类用于被线程执行,而Future代表的是异步计算的结果。因此,FutureTask类可以理解为,执行run()(实现Runnable接口中的方法),通过Future的get()方法获取结果。

  1. //任务线程总共有七中状态如下:
  2. * Possible state transitions:
  3. * NEW -> COMPLETING -> NORMAL
  4. * NEW -> COMPLETING -> EXCEPTIONAL
  5. * NEW -> CANCELLED
  6. * NEW -> INTERRUPTING -> INTERRUPTED
  7. */
  8. private volatile int state;
  9. private static final int NEW = 0;
  10. private static final int COMPLETING = 1;
  11. private static final int NORMAL = 2;
  12. private static final int EXCEPTIONAL = 3;
  13. private static final int CANCELLED = 4;
  14. private static final int INTERRUPTING = 5;
  15. private static final int INTERRUPTED = 6;
  16. /** The underlying callable; nulled out after running */
  17. //在run()方法中调用
  18. private Callable<V> callable;
  19. /** The result to return or exception to throw from get() */
  20. //任务执行结果,callable.call()正常执行的返回值
  21. private Object outcome; // non-volatile, protected by state reads/writes
  22. /** The thread running the callable; CASed during run() */
  23. //任务线程
  24. private volatile Thread runner;
  25. /** Treiber stack of waiting threads */
  26. //等待任务结果的线程组成的节点,放在链表对列中
  27. private volatile WaitNode waiters;
  1. public void run() {
  2. //1、若是任务的状态不是NEW,且使用CAS将runner置为当前线程则直接返回
  3. if (state != NEW ||
  4. !RUNNER.compareAndSet(this, null, Thread.currentThread()))
  5. return;
  6. try {
  7. Callable<V> c = callable;
  8. //2、任务不为null,且state的状态为NEW的情况下才执行任务
  9. if (c != null && state == NEW) {
  10. V result;
  11. boolean ran;
  12. try {
  13. //执行任务并接收执行结果
  14. result = c.call();
  15. //正常执行结果则将标识置为true
  16. ran = true;
  17. } catch (Throwable ex) {
  18. //3、任务发生异常,执行或cancel(),则结果置为null,并记录异常信息
  19. result = null;
  20. ran = false;
  21. setException(ex);
  22. }
  23. //4、任务正常结束,则设置返回结果
  24. if (ran)
  25. set(result);
  26. }
  27. } finally {
  28. // runner must be non-null until state is settled to
  29. // prevent concurrent calls to run()
  30. runner = null;
  31. // state must be re-read after nulling runner to prevent
  32. // leaked interrupts
  33. int s = state;
  34. //5、若是异常导致,走另一个流程
  35. if (s >= INTERRUPTING)
  36. handlePossibleCancellationInterrupt(s);
  37. }
  38. }

  1)若任务的状态不是NEW,或者使用CAS将runner置为当前线程失败,则直接返回的原因是防止多线程调用;

  2)再度确认任务执行的前置条件;

  3)任务执行异常,将result置为null,并记录异常,setException()源码如下:

  1. protected void setException(Throwable t) {
  2. //使用CAS将状态置为中间态COMPLETING
  3. if (STATE.compareAndSet(this, NEW, COMPLETING)) {
  4. outcome = t;
  5. STATE.setRelease(this, EXCEPTIONAL); // final state
  6. //任务处于结束态时,遍历唤醒等待result的线程
  7. finishCompletion();
  8. }
  9. }

  任务的状态变化为NEW  – >  COMPLETING  ->  EXCEPTIONAL

  4)任务正常结果则会设置result之后,唤醒waitNode的链表对列中等待任务结果的线程;

  5)异常后的调用逻辑如下:

  1. //保证调用cancel在run方法返回之前中断执行任务
  2. private void handlePossibleCancellationInterrupt(int s) {
  3. // It is possible for our interrupter to stall before getting a
  4. // chance to interrupt us. Let's spin-wait patiently.
  5. if (s == INTERRUPTING)
  6. //自旋等待
  7. while (state == INTERRUPTING)
  8. //当前线程让出CPU执行权
  9. Thread.yield(); // wait out pending interrupt
  10. }

  源码分析如下:

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. if (s <= COMPLETING)
  4. //等待任务完成
  5. s = awaitDone(false, 0L);
  6. //返回结果
  7. return report(s);
  8. }

  其中,等待过程分析如下:

  1. private int awaitDone(boolean timed, long nanos)
  2. throws InterruptedException {
  3. // The code below is very delicate, to achieve these goals:
  4. // - call nanoTime exactly once for each call to park
  5. // - if nanos <= 0L, return promptly without allocation or nanoTime
  6. // - if nanos == Long.MIN_VALUE, don't underflow
  7. // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
  8. // and we suffer a spurious wakeup, we will do no worse than
  9. // to park-spin for a while
  10. long startTime = 0L; // Special value 0L means not yet parked
  11. WaitNode q = null;
  12. boolean queued = false;
  13. for (;;) {
  14. int s = state;
  15. //1、任务的状态已经处于最终的状态,则将任务线程的引用置为null,直接返回状态
  16. if (s > COMPLETING) {
  17. if (q != null)
  18. q.thread = null;
  19. return s;
  20. }
  21. //2、任务的状态为COMPLETING说明任务已经接近完成,则当前线程让出CPU权限以便任务执行线程获取到CPU执行权
  22. else if (s == COMPLETING)
  23. // We may have already promised (via isDone) that we are done
  24. // so never return empty-handed or throw InterruptedException
  25. Thread.yield();
  26. //3、当前线程被中断,则将当前线程从等待任务结果的对列中移除,并抛出异常
  27. else if (Thread.interrupted()) {
  28. removeWaiter(q);
  29. throw new InterruptedException();
  30. }
  31. //4、任务线程的状态小于COMPLETING,则将当前调用get()方法的线程新建一个Node
  32. else if (q == null) {
  33. if (timed && nanos <= 0L)
  34. return s;
  35. q = new WaitNode();
  36. }
  37. //5、若由当前线程构成的Node未加入链表中,则加入
  38. else if (!queued)
  39. queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
  40. //6、是否开启了超时获取结果
  41. else if (timed) {
  42. final long parkNanos;
  43. if (startTime == 0L) { // first time
  44. startTime = System.nanoTime();
  45. if (startTime == 0L)
  46. startTime = 1L;
  47. parkNanos = nanos;
  48. } else {
  49. long elapsed = System.nanoTime() - startTime;
  50. //7、超时则从栈中移除当前线程
  51. if (elapsed >= nanos) {
  52. removeWaiter(q);
  53. return state;
  54. }
  55. parkNanos = nanos - elapsed;
  56. }
  57. // nanoTime may be slow; recheck before parking
  58. //当前线程挂起
  59. if (state < COMPLETING)
  60. LockSupport.parkNanos(this, parkNanos);
  61. }
  62. else
  63. LockSupport.park(this);
  64. }
  65. }

  获取到返回的状态值后,根据其状态值判断是返回结果还是抛出异常。

  1. public boolean cancel(boolean mayInterruptIfRunning) {
  2. //1、若任务线程的状态为NEW,则将其状态从NEW置为INTERRUPTING、CANCELLED
  3. if (!(state == NEW && STATE.compareAndSet
  4. (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
  5. //CAS改变任务线程的状态失败,则直接返回false,表示cancel失败
  6. return false;
  7. try { // in case call to interrupt throws exception
  8. //2、改变任务线程的状态成功后,根据是否中断running的任务线程的标识位,决定是否中断正在运行的任务线程
  9. if (mayInterruptIfRunning) {
  10. try {
  11. Thread t = runner;
  12. //任务线程不为null,则使用interrupt()中断
  13. if (t != null)
  14. t.interrupt();
  15. } finally { // final state
  16. //设置状态
  17. STATE.setRelease(this, INTERRUPTED);
  18. }
  19. }
  20. } finally {
  21. //3、清理等待任务结果的等待线程
  22. finishCompletion();
  23. }
  24. return true;
  25. }

  1)执行run()方法,是在调用在Callable的call()方法,其实在初始化时被指定;

  2)调用get()方法,若是任务线程还在执行,则会把调用get的线程封装成waitNode塞入到FutureTask类内部的阻塞链表对列中,可以有多个线程同时调用get()方法;

  3)cancel()方法是通过对任务线程调用interrupt()实现;

版权声明:本文为love-yh原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/love-yh/p/13375236.html