线程池简单探索
为什么使用线程池
- 第一点,线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度。因为线程池中的线程是可以复用的,我们只用少量的线程去执行大量的任务,这就大大减小了线程生命周期的开销。而且线程通常不是等接到任务后再临时创建,而是已经创建好时刻准备执行任务,这样就消除了线程创建所带来的延迟,提升了响应速度,增强了用户体验。
- 第二点,线程池可以统筹内存和 CPU 的使用,避免资源使用不当。线程池会根据配置和任务数量灵活地控制线程数量,不够的时候就创建,太多的时候就回收,避免线程过多导致内存溢出,或线程太少导致 CPU 资源浪费,达到了一个完美的平衡。
- 第三点,线程池可以统一管理资源。比如线程池可以统一管理任务队列和线程,可以统一开始或结束任务,比单个线程逐一处理任务要更方便、更易于管理,同时也有利于数据统计,比如我们可以很方便地统计出已经执行过的任务的数量。
线程池状态
- 运行(RUNNING):该状态下的线程池接收新任务并处理队列中的任务;线程池创建完毕就处于该状态,也就是正常状态;
- 关机(SHUTDOWN):线程池不接受新任务,但处理队列中的任务;线程池调用shutdown()之后的池状态;
- 停止(STOP):线程池不接受新任务,也不处理队列中的任务,并中断正在执行的任务;线程池调用shutdownNow()之后的池状态;
- 清理(TIDYING):线程池所有任务已经终止,workCount(当前线程数)为0;过渡到清理状态的线程将运行terminated()钩子方法;
- 终止(TERMINATED):terminated()方法结束后的线程池状态;
继承树
Executors
创建线程池
风险
ExecutorService
ThreadPoolExecutor
工作原理
构造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- 第一种情况是当我们调用 shutdown 等方法关闭线程池后,即便此时可能线程池内部依然有没执行完的任务正在执行,但是由于线程池已经关闭,此时如果再向线程池内提交任务,就会遭到拒绝。
- 第二种情况是线程池没有能力继续处理新提交的任务,也就是工作已经非常饱和的时候。
- 第一种拒绝策略是 AbortPolicy,这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略。
- 第二种拒绝策略是 DiscardPolicy,这种拒绝策略正如它的名字所描述的一样,当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。
- 第三种拒绝策略是 DiscardOldestPolicy,如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理它也存在一定的数据丢失风险。
-
第四种拒绝策略是 CallerRunsPolicy,相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处。
- 第一点新提交的任务不会被丢弃,这样也就不会造成业务损失。
- 第二点好处是,由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。
构造流程
总结出线程池的几个特点。
- 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加线程。
- 线程池只有在任务队列填满时才创建多于 corePoolSize 的线程,如果使用的是无界队列(例如 LinkedBlockingQueue),那么由于队列不会满,所以线程数不会超过 corePoolSize。
- 通过设置 corePoolSize 和 maximumPoolSize 为相同的值,就可以创建固定大小的线程池。
Worker
Worker如何绑定一个线程?
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { //防止在runWorker之前被中断,因为worker一旦建立就会加入workers集合中 //其他线程可能会中断空闲线程 //而空闲线程的依据就是能否获得worker的锁 setState(-1); //设置初始任务,注意这里没有null检查,故初始任务可以为空 this.firstTask = firstTask; //通过ThreadPoolExecutor的提供线程工厂来创建线程,并把自身赋值给它,作为其线程任务 //保留线程引用,用于中断线程 this.thread = getThreadFactory().newThread(this); }
Worker绑定的线程何时启动?
private boolean addWorker(Runnable firstTask, boolean core) { //core表示是否是核心线程 //先试图改变控制信息内 工作线程数 的值 retry: for (;;) { //获得控制信息 int c = ctl.get(); //从控制信息内 获取线程池运行状态 int rs = runStateOf(c); //如果已经SHUTDOWN或者STOP则不再添加新工作线程 //除非,在SHUTDOWN状态下,有任务尚未完成,不接受新任务 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //从控制信息内获取 工作线程数 int wc = workerCountOf(c); //工作线程已经超过容量 或 //核心线程,超过核心线程数 //非核心线程,超过最大线程数 //不得添加新线程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS改变控制信息内 工作线程数的值 +1 ,并结束自旋 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; //worker线程是否已经启动 boolean workerAdded = false; //worker线程是否已加入workers集合 Worker w = null; try { w = new Worker(firstTask); //创建新线程,把初始任务赋值给它 final Thread t = w.thread; //获取Worker的线程引用 if (t != null) { //因为要修改集合HashSet,故需获取线程池的锁,以保证线程安全 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取锁后再次检查状态,有可能在获得锁之前,线程池已经被shutdown了 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) //提前检查线程能否start throw new IllegalThreadStateException(); //把worker对象加入workers集合 workers.add(w); int s = workers.size(); //更新largetstPoolSize,此字段表示线程池运行时,最多开启过多少个线程 if (s > largestPoolSize) largestPoolSize = s; //线程已加入集合,如果前面出现异常,这里不会被执行 workerAdded = true; } } finally { mainLock.unlock(); } //如果添加成功,则启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //如果启动失败了,则表示添加Worker失败,回滚 if (! workerStarted) //这个方法,会把前面添加到workers集合中的对应worker删除 //并且把前面更新的 控制信息内的工作线程数再减回来 addWorkerFailed(w); } return workerStarted; }
Worker的执行
final void runWorker(Worker w) { //获得当前执行这段代码的线程 Thread wt = Thread.currentThread(); //先尝试从worker取得初始任务 Runnable task = w.firstTask; w.firstTask = null; //允许中断,unlock后state=1,中断方法获取到锁,则判断为空闲线程,可中断 w.unlock(); boolean completedAbruptly = true; try { //不断地取任务执行、 其中getTask提供阻塞。如果getTask返回null则退出循环 while (task != null || (task = getTask()) != null) { //获取锁,标识此线程正在工作,非空闲线程 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //钩子函数,空实现,子类可根据需要进行实现 beforeExecute(wt, task); Throwable thrown = null; try { //运行获取到的任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //钩子函数 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } //如果因为异常退出,这段语句不会被执行,也就是说completedAbruptly==true completedAbruptly = false; } finally { //工作线程退出的处理操作,如获取当前worker完成的任务量 //如果异常退出,还需弥补,补充工作线程等等 processWorkerExit(w, completedAbruptly); } }
线程安全问题
public void shutdown() { final ReentrantLock mainLock = this.mainLock; //获取线程池锁 mainLock.lock(); try { //检查执行线程是否有权关闭线程池 checkShutdownAccess(); //更改线程池运行状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断空闲线程 interruptIdleWorkers(); //钩子函数 onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
private void interruptIdleWorkers(boolean onlyOne) { //获取线程池的锁,保持独占访问 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍历workers集合中的所有工作线程 for (Worker w : workers) { //获得worker对象中的线程引用 Thread t = w.thread; //如果获得锁成功,则中断对应线程 //如果工作线程正在执行任务,因为开始执行前,任务会获取worker的锁,故其无法被中断 //如果工作线程正在等待任务,因其没获得锁,则当前线程可以获得其worker的锁,此工作线程被中断 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } //如果只需要关闭一个工作线程,则到此为止 if (onlyOne) break; } } finally { mainLock.unlock(); } }
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //更改线程池运行状态为STOP advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍历workers集合中的所有工作线程 for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }