本文关键字:

线程线程池单线程多线程线程池的好处线程回收创建方式核心参数底层机制拒绝策略,参数设置,动态监控线程隔离

线程和线程池相关的知识,是Java学习或者面试中一定会遇到的知识点,本篇我们会从线程和进程,并行与并发,单线程和多线程等,一直讲解到线程池,线程池的好处,创建方式,重要的核心参数,几个重要的方法,底层实现,拒绝策略,参数设置,动态调整,线程隔离等等。主要的大纲如下:

线程池,使用了池化思想来管理线程,池化技术就是为了最大化效益,最小化用户风险,将资源统一放在一起管理的思想。这种思想在很多地方都有使用到,不仅仅是计算机,比如金融,企业管理,设备管理等。

为什么要线程池?如果在并发的场景,编码人员根据需求来创建线程池,可能会有以下的问题:

  • 我们很难确定系统有多少线程在运行,如果使用就创建,不使用就销毁,那么创建和销毁线程的消耗也是比较大的
  • 假设来了很多请求,可能是爬虫,疯狂创建线程,可能把系统资源耗尽。

实现线程池有什么好处呢?

  • 降低资源消耗:池化技术可以重复利用已经创建的线程,降低线程创建和销毁的损耗。
  • 提高响应速度:利用已经存在的线程进行处理,少去了创建线程的时间
  • 管理线程可控:线程是稀缺资源,不能无限创建,线程池可以做到统一分配和监控
  • 拓展其他功能:比如定时线程池,可以定时执行任务

其实池化技术,用在比较多地方,比如:

  • 数据库连接池:数据库连接是稀缺资源,先创建好,提高响应速度,重复利用已有的连接
  • 实例池:先创建好对象放到池子里面,循环利用,减少来回创建和销毁的消耗

下面是与线程池相关的类的继承关系:

Executor 是顶级接口,里面只有一个方法execute(Runnable command),定义的是调度线程池来执行任务,它定义了线程池的基本规范,执行任务是它的天职。

ExecutorService 继承了Executor,但是它仍然是一个接口,它多了一些方法:

  • void shutdown():关闭线程池,会等待任务执行完。
  • List<Runnable> shutdownNow():立刻关闭线程池,尝试停止所有正在积极执行的任务,停止等待任务的处理,并返回一个正在等待执行的任务列表(还没有执行的)
  • boolean isShutdown():判断线程池是不是已经关闭,但是可能线程还在执行。
  • boolean isTerminated():在执行shutdown/shutdownNow之后,所有的任务已经完成,这个状态就是true。
  • boolean awaitTermination(long timeout, TimeUnit unit):执行shutdown之后,阻塞等到terminated状态,除非超时或者被打断。
  • <T> Future<T> submit(Callable<T> task): 提交一个有返回值的任务,并且返回该任务尚未有结果的Future,调用future.get()方法,可以返回任务完成的时候的结果。
  • <T> Future<T> submit(Runnable task, T result):提交一个任务,传入返回结果,这个result没有什么作用,只是指定类型和一个返回的结果。
  • Future<?> submit(Runnable task): 提交任务,返回Future
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):批量执行tasks,获取Future的list,可以批量提交任务。
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit):批量提交任务,并指定超时时间
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks): 阻塞,获取第一个完成任务的结果值,
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit):阻塞,获取第一个完成结果的值,指定超时时间

可能有同学对前面的<T> Future<T> submit(Runnable task, T result)有疑问,这个reuslt有什么作用?

其实它没有什么作用,只是持有它,任务完成后,还是调用 future.get()返回这个结果,用result new 了一个 ftask,其内部其实是使用了Runnable的包装类 RunnableAdapter,没有对result做特殊的处理,调用 call() 方法的时候,直接返回这个结果。(Executors 中具体的实现)

  1. public <T> Future<T> submit(Runnable task, T result) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<T> ftask = newTaskFor(task, result);
  4. execute(ftask);
  5. return ftask;
  6. }
  7. static final class RunnableAdapter<T> implements Callable<T> {
  8. final Runnable task;
  9. final T result;
  10. RunnableAdapter(Runnable task, T result) {
  11. this.task = task;
  12. this.result = result;
  13. }
  14. public T call() {
  15. task.run();
  16. // 返回传入的结果
  17. return result;
  18. }
  19. }

还有一个方法值得一提:invokeAny(): 在 ThreadPoolExecutor中使用ExecutorService 中的方法 invokeAny() 取得第一个完成的任务的结果,当第一个任务执行完成后,会调用 interrupt() 方法将其他任务中断。

注意,ExecutorService是接口,里面都是定义,并没有涉及实现,而前面的讲解都是基于它的名字(规定的规范)以及它的普遍实现来说的。

可以看到 ExecutorService 定义的是线程池的一些操作,包括关闭,判断是否关闭,是否停止,提交任务,批量提交任务等等。

AbstractExecutorService 是一个抽象类,实现了 ExecutorService接口,这是大部分线程池的基本实现,定时的线程池先不关注,主要的方法如下:

不仅实现了submitinvokeAllinvokeAny 等方法,而且提供了一个 newTaskFor 方法用于构建 RunnableFuture 对象,那些能够获取到任务返回结果的对象都是通过 newTaskFor 来获取的。不展开里面所有的源码的介绍,仅以submit()方法为例:

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. // 封装任务
  4. RunnableFuture<Void> ftask = newTaskFor(task, null);
  5. // 执行任务
  6. execute(ftask);
  7. // 返回 RunnableFuture 对象
  8. return ftask;
  9. }

但是在 AbstractExecutorService 是没有对最最重要的方法进行实现的,也就是 execute() 方法。线程池具体是怎么执行的,这个不同的线程池可以有不同的实现,一般都是继承 AbstractExecutorService (定时任务有其他的接口),我们最最常用的就是ThreadPoolExecutor

重点来了!!! ThreadPoolExecutor 一般就是我们平时常用到的线程池类,所谓创建线程池,如果不是定时线程池,就是使用它。

先看ThreadPoolExecutor的内部结构(属性):

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. // 状态控制,主要用来控制线程池的状态,是核心的遍历,使用的是原子类
  3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  4. // 用来表示线程数量的位数(使用的是位运算,一部分表示线程的数量,一部分表示线程池的状态)
  5. // SIZE = 32 表示32位,那么COUNT_BITS就是29位
  6. private static final int COUNT_BITS = Integer.SIZE - 3;
  7. // 线程池的容量,也就是27位表示的最大值
  8. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  9. // 状态量,存储在高位,32位中的前3位
  10. // 111(第一位是符号位,1表示负数),线程池运行中
  11. private static final int RUNNING = -1 << COUNT_BITS;
  12. // 000
  13. private static final int SHUTDOWN = 0 << COUNT_BITS;
  14. // 001
  15. private static final int STOP = 1 << COUNT_BITS;
  16. // 010
  17. private static final int TIDYING = 2 << COUNT_BITS;
  18. // 011
  19. private static final int TERMINATED = 3 << COUNT_BITS;
  20. // 取出运行状态
  21. private static int runStateOf(int c) { return c & ~CAPACITY; }
  22. // 取出线程数量
  23. private static int workerCountOf(int c) { return c & CAPACITY; }
  24. // 用运行状态和线程数获取ctl
  25. private static int ctlOf(int rs, int wc) { return rs | wc; }
  26. // 任务等待队列
  27. private final BlockingQueue<Runnable> workQueue;
  28. // 可重入主锁(保证一些操作的线程安全)
  29. private final ReentrantLock mainLock = new ReentrantLock();
  30. // 线程的集合
  31. private final HashSet<Worker> workers = new HashSet<Worker>();
  32. // 在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),
  33. // 传统线程的通信方式,Condition都可以实现,Condition和传统的线程通信没什么区别,Condition的强大之处在于它可以为多个线程间建立不同的Condition
  34. private final Condition termination = mainLock.newCondition();
  35. // 最大线程池大小
  36. private int largestPoolSize;
  37. // 完成的任务数量
  38. private long completedTaskCount;
  39. // 线程工厂
  40. private volatile ThreadFactory threadFactory;
  41. // 任务拒绝处理器
  42. private volatile RejectedExecutionHandler handler;
  43. // 非核心线程的存活时间
  44. private volatile long keepAliveTime;
  45. // 允许核心线程的超时时间
  46. private volatile boolean allowCoreThreadTimeOut;
  47. // 核心线程数
  48. private volatile int corePoolSize;
  49. // 工作线程最大容量
  50. private volatile int maximumPoolSize;
  51. // 默认的拒绝处理器(丢弃任务)
  52. private static final RejectedExecutionHandler defaultHandler =
  53. new AbortPolicy();
  54. // 运行时关闭许可
  55. private static final RuntimePermission shutdownPerm =
  56. new RuntimePermission("modifyThread");
  57. // 上下文
  58. private final AccessControlContext acc;
  59. // 只有一个线程
  60. private static final boolean ONLY_ONE = true;
  61. }

从上面的代码可以看出,用一个32位的对象保存线程池的状态以及线程池的容量,高3位是线程池的状态,而剩下的29位,则是保存线程的数量:

  1. // 状态量,存储在高位,32位中的前3位
  2. // 111(第一位是符号位,1表示负数),线程池运行中
  3. private static final int RUNNING = -1 << COUNT_BITS;
  4. // 000
  5. private static final int SHUTDOWN = 0 << COUNT_BITS;
  6. // 001
  7. private static final int STOP = 1 << COUNT_BITS;
  8. // 010
  9. private static final int TIDYING = 2 << COUNT_BITS;
  10. // 011
  11. private static final int TERMINATED = 3 << COUNT_BITS;

各种状态之间是不一样的,他们的状态之间变化如下:

  • RUNNING:运行状态,可以接受任务,也可以处理任务
  • SHUTDOWN:不可以接受任务,但是可以处理任务
  • STOP:不可以接受任务,也不可以处理任务,中断当前任务
  • TIDYING:所有线程停止
  • TERMINATED:线程池的最后状态

线程池,肯定得有池子,并且是放线程的地方,在 ThreadPoolExecutor 中表现为 Worker,这是内部类:

线程池其实就是 Worker (打工人,不断的领取任务,完成任务)的集合,这里使用的是 HashSet:

  1. private final HashSet<Worker> workers = new HashSet<Worker>();

Worker 怎么实现的呢?

Worker 除了继承了 AbstractQueuedSynchronizer,也就是 AQSAQS 本质上就是个队列锁,一个简单的互斥锁,一般是在中断或者修改 worker 状态的时候使用。

内部引入AQS,是为了线程安全,线程执行任务的时候,调用的是runWorker(Worker w),这个方法不是worker的方法,而是 ThreadPoolExecutor的方法。从下面的代码可以看出,每次修改Worker的状态的时候,都是线程安全的。Worker里面,持有了一个线程Thread,可以理解为是对线程的封装。

至于runWorker(Worker w)是怎么运行的?先保持这个疑问,后面详细讲解。

  1. // 实现 Runnable,封装了线程
  2. private final class Worker
  3. extends AbstractQueuedSynchronizer
  4. implements Runnable
  5. {
  6. // 序列化id
  7. private static final long serialVersionUID = 6138294804551838833L;
  8. // worker运行的线程
  9. final Thread thread;
  10. // 初始化任务,有可能是空的,如果任务不为空的时候,其他进来的任务,可以直接运行,不在添加到任务队列
  11. Runnable firstTask;
  12. // 线程任务计数器
  13. volatile long completedTasks;
  14. // 指定一个任务让工人忙碌起来,这个任务可能是空的
  15. Worker(Runnable firstTask) {
  16. // 初始化AQS队列锁的状态
  17. setState(-1); // 禁止中断直到 runWorker
  18. this.firstTask = firstTask;
  19. // 从线程工厂,取出一个线程初始化
  20. this.thread = getThreadFactory().newThread(this);
  21. }
  22. // 实际上运行调用的是runWorker
  23. public void run() {
  24. // 不断循环获取任务进行执行
  25. runWorker(this);
  26. }
  27. // 0表示没有被锁
  28. // 1表示被锁的状态
  29. protected boolean isHeldExclusively() {
  30. return getState() != 0;
  31. }
  32. // 独占,尝试获取锁,如果成功返回true,失败返回false
  33. protected boolean tryAcquire(int unused) {
  34. // CAS 乐观锁
  35. if (compareAndSetState(0, 1)) {
  36. // 成功,当前线程独占锁
  37. setExclusiveOwnerThread(Thread.currentThread());
  38. return true;
  39. }
  40. return false;
  41. }
  42. // 独占方式,尝试释放锁
  43. protected boolean tryRelease(int unused) {
  44. setExclusiveOwnerThread(null);
  45. setState(0);
  46. return true;
  47. }
  48. // 上锁,调用的是AQS的方法
  49. public void lock() { acquire(1); }
  50. // 尝试上锁
  51. public boolean tryLock() { return tryAcquire(1); }
  52. // 解锁
  53. public void unlock() { release(1); }
  54. // 是否锁住
  55. public boolean isLocked() { return isHeldExclusively(); }
  56. // 如果开始可就中断
  57. void interruptIfStarted() {
  58. Thread t;
  59. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  60. try {
  61. t.interrupt();
  62. } catch (SecurityException ignore) {
  63. }
  64. }
  65. }
  66. }

除了放线程池的地方,要是任务很多,没有那么多线程,肯定需要一个地方放任务,充当缓冲作用,也就是任务队列,在代码中表现为:

  1. private final BlockingQueue<Runnable> workQueue;

计算机的内存总是有限的,我们不可能一直往队列里面增加内容,所以线程池为我们提供了选择,可以选择多种队列。同时当任务实在太多,占满了线程,并且把任务队列也占满的时候,我们需要做出一定的反应,那就是拒绝还是抛出错误,丢掉任务?丢掉哪些任务,这些都是可能需要定制的内容。

关于如何创建线程池,其实 ThreadPoolExecutor提供了构造方法,主要参数如下,不传的话会使用默认的:

  • 核心线程数:核心线程数,一般是指常驻的线程,没有任务的时候通常也不会销毁
  • 最大线程数:线程池允许创建的最大的线程数量
  • 非核心线程的存活时间:指的是没有任务的时候,非核心线程能够存活多久
  • 时间的单位:存活时间的单位
  • 存放任务的队列:用来存放任务
  • 线程工厂
  • 拒绝处理器:如果添加任务失败,将由该处理器处理
  1. // 指定核心线程数,最大线程数,非核心线程没有任务的存活时间,时间单位,任务队列
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue) {
  7. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  8. Executors.defaultThreadFactory(), defaultHandler);
  9. }
  10. // 指定核心线程数,最大线程数,非核心线程没有任务的存活时间,时间单位,任务队列,线程池工厂
  11. public ThreadPoolExecutor(int corePoolSize,
  12. int maximumPoolSize,
  13. long keepAliveTime,
  14. TimeUnit unit,
  15. BlockingQueue<Runnable> workQueue,
  16. ThreadFactory threadFactory) {
  17. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  18. threadFactory, defaultHandler);
  19. }
  20. // 指定核心线程数,最大线程数,非核心线程没有任务的存活时间,时间单位,任务队列,拒绝任务处理器
  21. public ThreadPoolExecutor(int corePoolSize,
  22. int maximumPoolSize,
  23. long keepAliveTime,
  24. TimeUnit unit,
  25. BlockingQueue<Runnable> workQueue,
  26. RejectedExecutionHandler handler) {
  27. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  28. Executors.defaultThreadFactory(), handler);
  29. }
  30. // 最后其实都是调用了这个方法
  31. public ThreadPoolExecutor(int corePoolSize,
  32. int maximumPoolSize,
  33. long keepAliveTime,
  34. TimeUnit unit,
  35. BlockingQueue<Runnable> workQueue,
  36. ThreadFactory threadFactory,
  37. RejectedExecutionHandler handler) {
  38. ...
  39. }

其实,除了显示的指定上面的参数之外,JDK也封装了一些直接创建线程池的方法给我们,那就是Executors:

  1. // 固定线程数量的线程池,无界的队列
  2. public static ExecutorService newFixedThreadPool(int nThreads) {
  3. return new ThreadPoolExecutor(nThreads, nThreads,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>());
  6. }
  7. // 单个线程的线程池,无界的队列,按照任务提交的顺序,串行执行
  8. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  9. return new FinalizableDelegatedExecutorService
  10. (new ThreadPoolExecutor(1, 1,
  11. 0L, TimeUnit.MILLISECONDS,
  12. new LinkedBlockingQueue<Runnable>(),
  13. threadFactory));
  14. }
  15. // 动态调节,没有核心线程,全部都是普通线程,每个线程存活60s,使用容量为1的阻塞队列
  16. public static ExecutorService newCachedThreadPool() {
  17. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  18. 60L, TimeUnit.SECONDS,
  19. new SynchronousQueue<Runnable>());
  20. }
  21. // 定时任务线程池
  22. public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  23. return new DelegatedScheduledExecutorService
  24. (new ScheduledThreadPoolExecutor(1));
  25. }

但是一般是不推荐使用上面别人封装的线程池的哈!!!

看完上面的创建参数大家可能会有点懵,但是没关系,一一为大家道来:

可以看出,当有任务进来的时候,先判断核心线程池是不是已经满了,如果还没有,将会继续创建线程。注意,如果一个任务进来,创建线程执行,执行完成,线程空闲下来,这时候再来一个任务,是会继续使用之前的线程,还是重新创建一个线程来执行呢?

答案是重新创建线程,这样线程池可以快速达到核心线程数的规模大小,以便快速响应后面的任务。

如果线程数量已经到达核心线程数,来了任务,线程池的线程又都不是空闲状态,那么就会判断队列是不是满的,倘若队列还有空间,那么就会把任务放进去队列中,等待线程领取执行。

如果任务队列已经满了,放不下任务,那么就会判断线程数是不是已经到最大线程数了,要是还没有到达,就会继续创建线程并执行任务,这个时候创建的是非核心部分线程。

如果已经到达最大线程数,那么就不能继续创建线程了,只能执行拒绝策略,默认的拒绝策略是丢弃任务,我们可以自定义拒绝策略。

值得注意的是,倘若之前任务比较多,创建出了一些非核心线程,那么任务少了之后,领取不到任务,过了一定时间,非核心线程就会销毁,只剩下核心线程池的数量的线程。这个时间就是前面说的keepAliveTime

提交任务,我们看execute(),会先获取线程池的状态和个数,要是线程个数还没达到核心线程数,会直接添加线程,否则会放到任务队列,如果任务队列放不下,会继续增加线程,但是不是增加核心线程。

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. // 获取状态和个数
  5. int c = ctl.get();
  6. // 如果个数小于核心线程数
  7. if (workerCountOf(c) < corePoolSize) {
  8. // 直接添加
  9. if (addWorker(command, true))
  10. return;
  11. // 添加失败则继续获取
  12. c = ctl.get();
  13. }
  14. // 判断线程池状态是不是运行中,任务放到队列中
  15. if (isRunning(c) && workQueue.offer(command)) {
  16. // 再次检查
  17. int recheck = ctl.get();
  18. // 判断线程池是不是还在运行
  19. if (! isRunning(recheck) && remove(command))
  20. // 如果不是,那么就拒绝并移除任务
  21. reject(command);
  22. else if (workerCountOf(recheck) == 0)
  23. // 如果线程数为0,并且还在运行,那么就直接添加
  24. addWorker(null, false);
  25. }else if (!addWorker(command, false))
  26. // 添加任务队列失败,拒绝
  27. reject(command);
  28. }

上面的源码中,调用了一个重要的方法:addWorker(Runnable firstTask, boolean core),该方法主要是为了增加工作的线程,我们来看看它是如何执行的:

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. // 回到当前位置重试
  3. retry:
  4. for (;;) {
  5. // 获取状态
  6. int c = ctl.get();
  7. int rs = runStateOf(c);
  8. // 大于SHUTDOWN说明线程池已经停止
  9. // ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) 表示三个条件至少有一个不满足
  10. // 不等于SHUTDOWN说明是大于shutdown
  11. // firstTask != null 任务不是空的
  12. // workQueue.isEmpty() 队列是空的
  13. if (rs >= SHUTDOWN &&
  14. ! (rs == SHUTDOWN &&
  15. firstTask == null &&
  16. ! workQueue.isEmpty()))
  17. return false;
  18. for (;;) {
  19. // 工作线程数
  20. int wc = workerCountOf(c);
  21. // 是否符合容量
  22. if (wc >= CAPACITY ||
  23. wc >= (core ? corePoolSize : maximumPoolSize))
  24. return false;
  25. // 添加成功,跳出循环
  26. if (compareAndIncrementWorkerCount(c))
  27. break retry;
  28. c = ctl.get(); // Re-read ctl
  29. // cas失败,重新尝试
  30. if (runStateOf(c) != rs)
  31. continue retry;
  32. // else CAS failed due to workerCount change; retry inner loop
  33. }
  34. }
  35. // 前面线程计数增加成功
  36. boolean workerStarted = false;
  37. boolean workerAdded = false;
  38. Worker w = null;
  39. try {
  40. // 创建了一个worker,包装了任务
  41. w = new Worker(firstTask);
  42. final Thread t = w.thread;
  43. // 线程创建成功
  44. if (t != null) {
  45. // 获取锁
  46. final ReentrantLock mainLock = this.mainLock;
  47. mainLock.lock();
  48. try {
  49. // 再次确认状态
  50. int rs = runStateOf(ctl.get());
  51. if (rs < SHUTDOWN ||
  52. (rs == SHUTDOWN && firstTask == null)) {
  53. // 如果线程已经启动,失败
  54. if (t.isAlive()) // precheck that t is startable
  55. throw new IllegalThreadStateException();
  56. // 新增线程到集合
  57. workers.add(w);
  58. // 获取大小
  59. int s = workers.size();
  60. // 判断最大线程池数量
  61. if (s > largestPoolSize)
  62. largestPoolSize = s;
  63. // 已经添加工作线程
  64. workerAdded = true;
  65. }
  66. } finally {
  67. // 解锁
  68. mainLock.unlock();
  69. }
  70. // 如果已经添加
  71. if (workerAdded) {
  72. // 启动线程
  73. t.start();
  74. workerStarted = true;
  75. }
  76. }
  77. } finally {
  78. // 如果没有启动
  79. if (! workerStarted)
  80. // 失败处理
  81. addWorkerFailed(w);
  82. }
  83. return workerStarted;
  84. }

前面在介绍Worker这个类的时候,我们讲解到其实它的run()方法调用的是外部的runWorker()方法,那么我们来看看runWorkder()方法:

首先,它会直接处理自己的firstTask,这个任务并没有在任务队列里面,而是它自己持有的:

  1. final void runWorker(Worker w) {
  2. // 当前线程
  3. Thread wt = Thread.currentThread();
  4. // 第一个任务
  5. Runnable task = w.firstTask;
  6. // 重置为null
  7. w.firstTask = null;
  8. // 允许打断
  9. w.unlock();
  10. boolean completedAbruptly = true;
  11. try {
  12. // 任务不为空,或者获取的任务不为空
  13. while (task != null || (task = getTask()) != null) {
  14. // 加锁
  15. w.lock();
  16. //如果线程池停止,确保线程被中断;
  17. //如果不是,确保线程没有被中断。这
  18. //在第二种情况下需要复查处理
  19. // shutdown - now竞赛同时清除中断
  20. if ((runStateAtLeast(ctl.get(), STOP) ||
  21. (Thread.interrupted() &&
  22. runStateAtLeast(ctl.get(), STOP))) &&
  23. !wt.isInterrupted())
  24. wt.interrupt();
  25. try {
  26. // 执行之前回调方法(可以由我们自己实现)
  27. beforeExecute(wt, task);
  28. Throwable thrown = null;
  29. try {
  30. // 执行任务
  31. task.run();
  32. } catch (RuntimeException x) {
  33. thrown = x; throw x;
  34. } catch (Error x) {
  35. thrown = x; throw x;
  36. } catch (Throwable x) {
  37. thrown = x; throw new Error(x);
  38. } finally {
  39. // 执行之后回调方法
  40. afterExecute(task, thrown);
  41. }
  42. } finally {
  43. // 置为null
  44. task = null;
  45. // 更新完成任务
  46. w.completedTasks++;
  47. w.unlock();
  48. }
  49. }
  50. // 完成
  51. completedAbruptly = false;
  52. } finally {
  53. // 处理线程退出相关工作
  54. processWorkerExit(w, completedAbruptly);
  55. }
  56. }

上面可以看到如果当前的任务是null,会去获取一个task,我们看看getTask(),里面涉及到了两个参数,一个是是不是允许核心线程销毁,另外一个是线程数是不是大于核心线程数,如果满足条件,就从队列中取出任务,如果超时取不到,那就返回空,表示没有取到任务,没有取到任务,就不会执行前面的循环,就会触发线程销毁processWorkerExit()等工作。

  1. private Runnable getTask() {
  2. // 是否超时
  3. boolean timedOut = false; // Did the last poll() time out?
  4. for (;;) {
  5. int c = ctl.get();
  6. int rs = runStateOf(c);
  7. // SHUTDOWN状态继续处理队列中的任务,但是不接收新的任务
  8. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  9. decrementWorkerCount();
  10. return null;
  11. }
  12. // 线程数
  13. int wc = workerCountOf(c);
  14. // 是否允许核心线程超时或者线程数大于核心线程数
  15. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  16. if ((wc > maximumPoolSize || (timed && timedOut))
  17. && (wc > 1 || workQueue.isEmpty())) {
  18. // 减少线程成功,就返回null,后面由processWorkerExit()处理
  19. if (compareAndDecrementWorkerCount(c))
  20. return null;
  21. continue;
  22. }
  23. try {
  24. // 如果允许核心线程关闭,或者超过了核心线程,就可以在超时的时间内获取任务,或者直接取出任务
  25. Runnable r = timed ?
  26. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  27. workQueue.take();
  28. // 如果能取到任务,那就肯定可以执行
  29. if (r != null)
  30. return r;
  31. // 否则就获取不到任务,超时了
  32. timedOut = true;
  33. } catch (InterruptedException retry) {
  34. timedOut = false;
  35. }
  36. }
  37. }

前面提到,如果线程当前任务为空,又允许核心线程销毁,或者线程超过了核心线程数,等待了一定时间,超时了却没有从任务队列获取到任务的话,就会跳出循环执行到后面的线程销毁(结束)程序。那销毁线程的时候怎么做呢?

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. // 如果是突然结束的线程,那么之前的线程数是没有调整的,这里需要调整
  3. if (completedAbruptly)
  4. decrementWorkerCount();
  5. // 获取锁
  6. final ReentrantLock mainLock = this.mainLock;
  7. mainLock.lock();
  8. try {
  9. // 完成的任务数
  10. completedTaskCount += w.completedTasks;
  11. // 移除线程
  12. workers.remove(w);
  13. } finally {
  14. // 解锁
  15. mainLock.unlock();
  16. }
  17. // 试图停止
  18. tryTerminate();
  19. // 获取状态
  20. int c = ctl.get();
  21. // 比stop小,至少是shutdown
  22. if (runStateLessThan(c, STOP)) {
  23. // 如果不是突然完成
  24. if (!completedAbruptly) {
  25. // 最小值要么是0,要么是核心线程数,要是允许核心线程超时销毁,那么就是0
  26. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  27. // 如果最小的是0或者队列不是空的,那么保留一个线程
  28. if (min == 0 && ! workQueue.isEmpty())
  29. min = 1;
  30. // 只要大于等于最小的线程数,就结束当前线程
  31. if (workerCountOf(c) >= min)
  32. return; // replacement not needed
  33. }
  34. // 否则的话,可能还需要新增工作线程
  35. addWorker(null, false);
  36. }
  37. }

停止线程池可以使用shutdown()或者shutdownNow()shutdown()可以继续处理队列中的任务,而shutdownNow()会立即清理任务,并返回未执行的任务。

  1. public void shutdown() {
  2. // 获取锁
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. // 检查停止权限
  7. checkShutdownAccess();
  8. // 更新状态
  9. advanceRunState(SHUTDOWN);
  10. // 中断所有线程
  11. interruptIdleWorkers();
  12. // 回调钩子
  13. onShutdown(); // hook for ScheduledThreadPoolExecutor
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. tryTerminate();
  18. }
  19. // 立刻停止
  20. public List<Runnable> shutdownNow() {
  21. List<Runnable> tasks;
  22. // 获取锁
  23. final ReentrantLock mainLock = this.mainLock;
  24. mainLock.lock();
  25. try {
  26. // 检查停止权限
  27. checkShutdownAccess();
  28. // 更新状态到stop
  29. advanceRunState(STOP);
  30. // 中断所有线程
  31. interruptWorkers();
  32. // 清理队列
  33. tasks = drainQueue();
  34. } finally {
  35. mainLock.unlock();
  36. }
  37. tryTerminate();
  38. // 返回任务列表(未完成)
  39. return tasks;
  40. }
  • execute() 方法可以提交不需要返回值的任务,无法判断任务是否被线程池执行是否成功
  • submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个对象,我们调用get()方法就可以阻塞,直到获取到线程执行完成的结果,同时我们也可以使用有超时时间的等待方法get(long timeout,TimeUnit unit),这样不管线程有没有执行完成,如果到时间,也不会阻塞,直接返回null。返回的是RunnableFuture对象,继承了Runnable, Future<V>两个接口:
  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2. /**
  3. * Sets this Future to the result of its computation
  4. * unless it has been cancelled.
  5. */
  6. void run();
  7. }

阻塞队列,首先是一个队列,肯定具有先进先出的属性。

而阻塞,则是这个模型的演化,一般队列,可以用在生产消费者模型,也就是数据共享,有人往里面放任务,有人不断的往里面取出任务,这是一个理想的状态。

但是倘若不理想,产生任务和消费任务的速度不一样,要是任务放在队列里面比较多,消费比较慢,还可以慢慢消费,或者生产者得暂停一下产生任务(阻塞生产者线程)。可以使用 offer(E o, long timeout, TimeUnit unit)设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败,也可以使用put(Object),将对象放到阻塞队列里面,如果没有空间,那么这个方法会阻塞到有空间才会放进去。

如果消费速度快,生产者来不及生产,获取任务的时候,可以使用poll(time),有数据则直接取出来,没数据则可以等待time时间后,返回null。也可以使用take()取出第一个任务,没有任务就会一直阻塞到队列有任务为止。

上面说了阻塞队列的属性,那么为啥要用呢?

  • 如果产生任务,来了就往队列里面放,资源很容易被耗尽。
  • 创建线程需要获取锁,这个一个线程池的全局锁,如果各个线程不断的获取锁,解锁,线程上下文切换之类的开销也比较大,不如在队列为空的时候,然一个线程阻塞等待。

  • ArrayBlockingQueue:基于数组实现,内部有一个定长的数组,同时保存着队列头和尾部的位置。
  • LinkedBlockingQueue:基于链表的阻塞对垒,生产者和消费者使用独立的锁,并行能力强,如果不指定容量,默认是无效容量,容易系统内存耗尽。
  • DelayQueue:延迟队列,没有大小限制,生产数据不会被阻塞,消费数据会,只有指定的延迟时间到了,才能从队列中获取到该元素。
  • PriorityBlockingQueue:基于优先级的阻塞队列,按照优先级进行消费,内部控制同步的是公平锁。
  • SynchronousQueue:没有缓冲,生产者直接把任务交给消费者,少了中间的缓存区。

前面的源码分析,其实已经讲解过这个问题了,线程池的线程调用的run()方法,其实调用的是runWorker(),里面是死循环,除非获取不到任务,如果没有了任务firstTask并且从任务队列中获取不到任务,超时的时候,会再判断是不是可以销毁核心线程,或者超过了核心线程数,满足条件的时候,才会让当前的线程结束。

否则,一直都在一个循环中,不会结束。

我们知道start()方法只能调用一次,因此调用到run()方法的时候,调用外面的runWorker(),让其在runWorker()的时候,不断的循环,获取任务。获取到任务,调用任务的run()方法。

执行完成的线程会调用processWorkerExit(),前面有分析,里面会获取锁,把线程数减少,从工作线程从集合中移除,移除掉之后,会判断线程是不是太少了,如果是,会再加回来,个人以为是一种补救。

一般而言,有个公式,如果是计算(CPU)密集型的任务,那么核心线程数设置为处理器核数-1,如果是io密集型(很多网络请求),那么就可以设置为2*处理器核数。但是这并不是一个银弹,一切要从实际出发,最好就是在测试环境进行压测,实践出真知,并且很多时候一台机器不止一个线程池或者还会有其他的线程,因此参数不可设置得太过饱满。

一般 8 核的机器,设置 10-12 个核心线程就差不多了,这一切必须按照业务具体值进行计算。设置过多的线程数,上下文切换,竞争激烈,设置过少,没有办法充分利用计算机的资源。

计算(CPU)密集型消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

io密集型系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

阿里的编程规范里面,不建议使用默认的方式来创建线程,是因为这样创建出来的线程很多时候参数都是默认的,可能创建者不太了解,很容易出问题,最好通过new ThreadPoolExecutor()来创建,方便控制参数。默认的方式创建的问题如下:

  • Executors.newFixedThreadPool():无界队列,内存可能被打爆
  • Executors.newSingleThreadExecutor():单个线程,效率低,串行。
  • Executors.newCachedThreadPool():没有核心线程,最大线程数可能为无限大,内存可能还会爆掉。

使用具体的参数创建线程池,开发者必须了解每个参数的作用,不会胡乱设置参数,减少内存溢出等问题。

一般体现在几个问题:

  • 任务队列怎么设置?
  • 核心线程多少个?
  • 最大线程数多少?
  • 怎么拒绝任务?
  • 创建线程的时候没有名称,追溯问题不好找。

线程池一般有以下四种拒绝策略,其实我们可以从它的内部类看出来:

  • AbortPolicy: 不执行新的任务,直接抛出异常,提示线程池已满
  • DisCardPolicy:不执行新的任务,但是也不会抛出异常,默默的
  • DisCardOldSetPolicy:丢弃消息队列中最老的任务,变成新进来的任务
  • CallerRunsPolicy:直接调用当前的execute来执行任务

一般而言,上面的拒绝策略都不会特别理想,一般要是任务满了,首先需要做的就是看任务是不是必要的,如果非必要,非核心,可以考虑拒绝掉,并报错提醒,如果是必须的,必须把它保存起来,不管是使用mq消息,还是其他手段,不能丢任务。在这些过程中,日志是非常必要的。既要保护线程池,也要对业务负责。

线程池提供了一些API,可以动态获取线程池的状态,并且还可以设置线程池的参数,以及状态:

查看线程池的状态:

修改线程池的状态:

关于这一点,美团的线程池文章讲得很清楚,甚至做了一个实时调整线程池参数的平台,可以进行跟踪监控,线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等等。这里我就不展开了,原文:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html ,这是我们可以参考的思路。

线程隔离,很多同学可能知道,就是不同的任务放在不同的线程里面运行,而线程池隔离,一般是按照业务类型来隔离,比如订单的处理线程放在一个线程池,会员相关的处理放在一个线程池。

也可以通过核心和非核心来隔离,核心处理流程放在一起,非核心放在一起,两个使用不一样的参数,不一样的拒绝策略,尽量保证多个线程池之间不影响,并且最大可能保住核心线程的运行,非核心线程可以忍受失败。

Hystrix里面运用到这个技术,Hystrix的线程隔离技术,来防止不同的网络请求之间的雪崩,即使依赖的一个服务的线程池满了,也不会影响到应用程序的其他部分。

秦怀,公众号【秦怀杂货店】作者,技术之路不在一时,山高水长,纵使缓慢,驰而不息。个人写作方向:Java源码解析,JDBC,Mybatis,Spring,redis,分布式,剑指Offer,LeetCode等,认真写好每一篇文章,不喜欢标题党,不喜欢花里胡哨,大多写系列文章,不能保证我写的都完全正确,但是我保证所写的均经过实践或者查找资料。遗漏或者错误之处,还望指正。

2020年我写了什么?

开源编程笔记

版权声明:本文为Damaer原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/Damaer/p/14914813.html