线程池--ThreadPoolExecutor
线程池的实现原理
- 1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤
需要获取全局锁)。 - 2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
- 3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执
行这一步骤需要获取全局锁)。 - 4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
线程池的使用
线程池的创建
可以通过ThreadPoolExecutor来创建一个线程池:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1)corePoolSize(线程池的基本大小):
当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
2)runnableTaskQueue(任务队列):
用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
-
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
-
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
-
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
-
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
3)maximumPoolSize(线程池最大数量):
线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。
4)ThreadFactory(线程工厂):
用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下。
new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();
5)RejectedExecutionHandler(饱和策略):
当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了以下4种策略。
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
向线程池提交任务
可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以上代码可知execute()方法输入的任务是一个Runnable类的实例。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个
future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
看下 AbstractExecutorService 的实现:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
submit方法最终都是转换一下,调用execute方法。其接收 RunnableFuture 类型的参数,FutureTask 实现了 RunnableFuture 接口,通过 newTaskFor 方法将 Runnable 和 Callable 都转换为 FutureTask。
对于Runnable,会调用Executors.callable方法,利用适配器模式,将call方法的调用委托给run方法。
最终的FutureTask都会持有一个callable对象。上面execute方法接收的就是Runnable类型的参数,实际传递的就是FutureTask。
任务的执行
线程池状态
线程池状态由一个整型的原子变量来表示,包括工作线程数和运行状态,为了用这样一个字段表示两个内容,32位的整型变量前三位用于状态标识,后29位可用来表示(2^29)-1个线程。
运行状态有以下几种:
- RUNNING: 接收新的任务并且处理队列里的任务
- SHUTDOWN: 不接受新的任务,但是处理队列里的任务
- STOP: 不接受新的任务,也不处理队列里的任务,并中断正在进行的任务
- TIDYING: 所有任务都已终止,workerCount为零,转换到状态 TIDYING 的线程将运行terminate()钩子方法
- TERMINATED: terminated() 方法已经执行
然后通过下面的两个方法分别取出二进制中各部分指定位数的长度,其他位置充零,用来表示各自的含义。
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
private static int workerCountOf(int c) {
return c & CAPACITY;
}
比如下面五种结果分别表示RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED:
101xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 101 00000000000000000000000000000
000xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 000 00000000000000000000000000000
001xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 001 00000000000000000000000000000
010xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 010 00000000000000000000000000000
011xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 011 00000000000000000000000000000
前面带符号位的3位表示状态,后面29位表示线程数,经过与运算后面直接都是0了。刚好跟-1,0,1,2,3这个五个数左移29位一样:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
执行步骤
向线程池提交任务后,后面就会调用 execute 方法来执行任务:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 首先获取线程池内现有线程数。如果少于核心线程数,
* 则将当前任务组建一个新 worker。添加 worker 成功则返回。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 如果 worker 添加失败,则再次获取原子变量ctl
c = ctl.get();
}
// 如果是 RUNNING 状态,则往 work 队列中插入任务。
if (isRunning(c) && workQueue.offer(command)) {
/*
* 如果插入成功,还要再次检查是否 RUNNING 状态,如果非
* RUNNING 状态,则要移除此任务,移除成功还要执行拒绝方法。
*/
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 是 RUNNING 状态或者非 RUNNING 状态但是 remove 任务失败,
* 则检查工作者线程数是否为0,是的话添加一个不执行的 worker。
* 此时没有传入任务,因为任务已经加入队列中,后面将从队列取出执行
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 线程池不是 RUNNING 状态或者是 RUNNING 状态但往队列加入任务失败(即队列已满),
* 再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程
* 数量的上限设置为maximumPoolSize,如果 addWorker 失败则执行拒绝策略
*/
else if (!addWorker(command, false))
reject(command);
}
关于 Worker
addWorker 方法包含两个参数,第一个是新线程首先要执行的任务,第二个是是否使用 corePoolSize 作为边界,如果不是则用 maximumPoolSize 作为线程数量的边界值。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
/*
* 如果是[RUNNING]状态或者是[SHUTDOWN]状态并且 firstTask 为 null
* 因为在SHUTDOWN时不会在添加新的任务,但还是会执行 workQueue中的任务
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
从 addWorker 方法中可以看出,首先会进行一系列状态的判断,满足条件则创建一个 Worker,传入 firstTask 作为首要执行的任务:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 创建的线程默认传入当前 Worker, Worker 实现了 Runnable
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
//......
}
Worker 的构造函数中还将 state 设置为 -1,这个 state 继承自AQS,注释说是为了禁止在 runWorker 方法调用之前中断。而 tryTerminate() 和 shutDown() 方法都会调用 interruptIdleWorkers() 方法:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
这个方法会判断当前 Worker 里的线程是否已中断,如果没有中断,调用 tryLock 方法尝试获取锁,如果能获取表示该线程没有正在处理的任务,则将该线程中断。而 tryLock 方法又会调用 tryAcquire 方法:
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
这个就是要把 state 从 0 改为 1,如果之前创建的 Worker 不设置 state 为 -1 的话,刚创建的 Worker 可能就要挂掉了。
通过线程工厂创建线程,Worker 实现了 Runnable,传入当前 Worker,这样线程 start() 的时候调用的就是该 Worker 的 run 方法,run 方法又调用了 runWorker 方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 是否遇到异常事件(遭遇突兀)
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
获取该 Worker 的 firstTask,如果不为空则会调用其 run 方法,为空的话则取队列 workQueue 中的 task 来执行:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
/*
* allowCoreThreadTimeOut 默认是false,也就是核心线程不允许进行超时
* 如果不允许超时并且当前线程池中线程大于核心线程的话都要超时控制
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* a.如果当前线程大于最大线程数,或者小于最大线程数但是需要超时控制并且已超时,
* 然后当前线程数大于1或者队列为空,
* 满足条件a就比较并减少线程数量,成功的话返回null,否则继续for循环。
*
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 不满足条件a,如果需要超时控制则通过 poll() 方法从队列中获取任务,不需要则直接使用阻塞方法 take() 取
// 如果 keepAliveTime 时间获取到任务则返回该任务,没有获取到任务则标记 timeOut 为 true
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
第一个条件判断状态为非 RUNNING,并且线程池正在终止或者任务队列为空了,这个时候就要将 workerCount 减1并返回 null。
第二个条件判断在工作线程大于最大线程数时,或者需要超时控制并且第一次取任务已经超时了,满足以上两个条件之一则判断工作线程数大于1任务或者队列为空,则将工作线程数减一,成功则返回 null,否则继续循环。
这里在队列中获取任务的同时关注线程池状态和当前工作线程数,还有任务队列为否为空,也就是在看究竟要不要这么多线程。
结合上面的 runWorker 方法,如果这里 getTask() 为 null 则跳出循环,执行processWorkerExit()方法:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1
if (completedAbruptly) // If abrupt, then workerCount wasn\'t adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
总结
-
以 execute 方法把任务送进去,这时可能创建一个新线程来执行任务,也可能只是把任务丢进了任务队列,等待其他线程执行完后来领任务。还可能因为达到了线程池上限被拒绝。
-
线程和 Worker 绑定在一起,每个 Worker 都会创建新线程,Worker 的执行是一个长期的过程,它在 execute 方法中被创建,但它的生命周期却不限于 execute 方法中。
-
addWorker 方法创建 Worker,也会触发 Worker 中的 thread.start(), Worker 本身实现了 Runnable 并放到了自己的 Thread 构造方法中,然后 start 会调用 Worker 实现的 run 方法,进一步调用线程池的 runWorker 方法。
-
runWorker 方法会让线程在执行完任务后循环使用,不断地去任务队列领取新任务,如果获取不到任务了,就要处理退出了,调用 processWorkerExit 完成善后处理,线程能重复利用靠的就是循环获取任务来延长其生命周期,退出循环其实已近标记它走线衰亡了。
线程池的设计还是比较复杂,要思考得比较全面,也要有一定的基础才能消化,在学习的过程中我产生了很多疑问,最终才能勉强在脑海中构成一个闭环。还有些疑问至今未能寻得答案,或者说给自己一个满意的解释,后面还要不断地探索学习。