ThreadPoolExecutor源码分析
ThreadPoolExecutor是Java自带线程池FixedThreadPool(固定大小)、 SingleThreadExecutor(单线程)、CacheThreadPool (无限大)的具体实现。我们也可以继承此类来实现自己的线程池。
其内部主要实现是通过队列保存需要执行的任务,并通过coreSize和maxSize控制线程池内线程的个数。
ThreadPoolExecutor的关键属性
ctl是存储runState(运行状态)和workerCount(工作线程数量)的一个AtomicInteger。
其中runState存储在ctl(共32位)的高3位,workerCount存储在低29位。
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//初始为0个workerCount 2 private static final int COUNT_BITS = Integer.SIZE - 3;//29位 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1;//前3位为0,后29位为1 4 5 private static final int RUNNING = -1 << COUNT_BITS;//运行 6 private static final int SHUTDOWN = 0 << COUNT_BITS;//不接受新任务,继续接受队列任务,线程池内任务继续执行 7 private static final int STOP = 1 << COUNT_BITS;//不接受新任务,不接受队列任务,且线程池内任务停止 8 private static final int TIDYING = 2 << COUNT_BITS;//全部任务已经停止,运行终止代码 9 private static final int TERMINATED = 3 << COUNT_BITS;//终止代码运行完毕 10 11 private static int runStateOf(int c) { return c & ~CAPACITY; }//根据ctl获取runState 12 private static int workerCountOf(int c) { return c & CAPACITY; }//根据ctl获取workerCount 13 private static int ctlOf(int rs, int wc) { return rs | wc; }//根据runState、workerCount获取ctl
ctl
ThreadPoolExecutor运行流程
光是看属性比较难理解,我们可以模拟平常使用线程池的方法,看看其内部是怎么运行的。
1 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>(), 5 threadFactory); 6 } 7 8 public static ExecutorService newCachedThreadPool() { 9 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10 60L, TimeUnit.SECONDS, 11 new SynchronousQueue<Runnable>()); 12 } 13 14 public ThreadPoolExecutor(int corePoolSize, 15 int maximumPoolSize, 16 long keepAliveTime, 17 TimeUnit unit, 18 BlockingQueue<Runnable> workQueue) { 19 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 20 Executors.defaultThreadFactory(), defaultHandler); 21 } 22 23 public ThreadPoolExecutor(int corePoolSize, 24 int maximumPoolSize, 25 long keepAliveTime, 26 TimeUnit unit, 27 BlockingQueue<Runnable> workQueue, 28 ThreadFactory threadFactory, 29 RejectedExecutionHandler handler) { 30 if (corePoolSize < 0 || 31 maximumPoolSize <= 0 || 32 maximumPoolSize < corePoolSize || 33 keepAliveTime < 0) 34 throw new IllegalArgumentException(); 35 if (workQueue == null || threadFactory == null || handler == null) 36 throw new NullPointerException(); 37 this.corePoolSize = corePoolSize; 38 this.maximumPoolSize = maximumPoolSize; 39 this.workQueue = workQueue; 40 this.keepAliveTime = unit.toNanos(keepAliveTime); 41 this.threadFactory = threadFactory; 42 this.handler = handler; 43 }
构造
FixedThreadPool传入的coreSize == maxSize,线程存活时间为无限,工作队列为有缓存(放入时未达到队列缓存值则返回)的无界阻塞队列。
CachedThreadPool传入的coreSize = 0, maxSize为无限大,线程存活时间为60秒,工作队列为无缓存(放入时必须要等待取出后才能返回、即缓存值为1)的阻塞队列。
一般我们调用excute(Runnable r)的方法来使用线程池。
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 5 int c = ctl.get(); 6 if (workerCountOf(c) < corePoolSize) {//当前工作线程小于coreSize 7 if (addWorker(command, true))//新增一个工作线程,并把其第一个任务设为传入的command 8 return;//成功直接返回 9 c = ctl.get(); 10 } 11 if (isRunning(c) && workQueue.offer(command)) {//工作线程达到coreSize,将任务放入队列中,等待工作线程获取 12 int recheck = ctl.get(); 13 if (! isRunning(recheck) && remove(command))//再次确认是否运行中,若无则移除该线程 14 reject(command);//拒绝 15 else if (workerCountOf(recheck) == 0)//workerCount在上面语句执行途中变为0 16 addWorker(null, false);//增加worker 17 } 18 else if (!addWorker(command, false))//添加失败,可能线程池关闭或饱和 19 reject(command);//拒绝 20 } 21 22 private boolean addWorker(Runnable firstTask, boolean core) { 23 retry: 24 for (;;) { 25 int c = ctl.get(); 26 int rs = runStateOf(c); 27 28 // 是否运行中 29 if (rs >= SHUTDOWN && 30 ! (rs == SHUTDOWN && 31 firstTask == null && 32 ! workQueue.isEmpty())) 33 return false; 34 35 for (;;) { 36 int wc = workerCountOf(c);// 37 if (wc >= CAPACITY || 38 wc >= (core ? corePoolSize : maximumPoolSize))//工作线程数量已达最大值 39 return false;//返回增加失败 40 if (compareAndIncrementWorkerCount(c))//尝试CAS增加workerCount 41 break retry;//成功返回 42 c = ctl.get(); // Re-read ctl 43 if (runStateOf(c) != rs)//可能终止 44 continue retry; 45 //增加失败,原因可能为workerCount被其他线程改变,重试内部循环 46 } 47 } 48 49 boolean workerStarted = false; 50 boolean workerAdded = false; 51 Worker w = null; 52 try { 53 w = new Worker(firstTask);//将该任务作为首任务 54 final Thread t = w.thread; 55 if (t != null) { 56 final ReentrantLock mainLock = this.mainLock; 57 mainLock.lock(); 58 try { 59 int rs = runStateOf(ctl.get());//在有锁的情况下重新检查runState 60 61 if (rs < SHUTDOWN || 62 (rs == SHUTDOWN && firstTask == null)) { 63 if (t.isAlive()) //t可用 64 throw new IllegalThreadStateException(); 65 workers.add(w);//放入集合中 66 int s = workers.size(); 67 if (s > largestPoolSize) 68 largestPoolSize = s;//largestPoolSize供外部调用,仅在有锁情况下返回 69 workerAdded = true; 70 } 71 } finally { 72 mainLock.unlock(); 73 } 74 if (workerAdded) { 75 t.start();//开启工作线程 76 workerStarted = true; 77 } 78 } 79 } finally { 80 if (! workerStarted) 81 addWorkerFailed(w); 82 } 83 return workerStarted; 84 }
excute
除了我在代码中另加的注释,jdk官方本身也有对excute执行顺序的解释,我尝试翻译如下:
1.如果小于corePoolSize的线程正在运行,尝试启动新的线程并把传入的command作为其第一个任务。addWorker方法检查runState和workerCount,通过返回false防止错误添加线程。
2.如果任务可以被成功入队,我们依然需要去双重检测在进入方法后,是否已经增加了一个线程(因为存在可能在第一次检测后一个线程结束了)或者线程池结束了。所以我们再次检测state,并当发现若线程池停止了,我们会撤回入队,或者当发现没有线程在执行了,会开启一个新线程。
3.如果我们不能把一个任务成功入队,我们就尝试增加一个新的线程,如果增加线程也失败了,我们就得知线程被终止或者饱和了,拒绝该任务。
任务执行的核心代码显然是在内部类Worker的run方法中,我们去看看。
1 public void run() { 2 runWorker(this);//调用外部的runWorker并把当前的worker传进去 3 } 4 5 final void runWorker(Worker w) { 6 Thread wt = Thread.currentThread(); 7 Runnable task = w.firstTask; 8 w.firstTask = null; 9 w.unlock(); // 解除锁定以允许中断 10 boolean completedAbruptly = true; 11 try { 12 while (task != null || (task = getTask()) != null) {//第一个任务为null,或者从队列取任务时返回null 13 w.lock(); 14 //如果线程池正在停止,确保线程中断,否则确保线程不中断。 15 if ((runStateAtLeast(ctl.get(), STOP) || 16 (Thread.interrupted() && 17 runStateAtLeast(ctl.get(), STOP))) && 18 !wt.isInterrupted()) 19 wt.interrupt();//当前线程应当中断 20 try { 21 beforeExecute(wt, task); 22 Throwable thrown = null; 23 try { 24 task.run();//执行任务 25 } catch (RuntimeException x) { 26 thrown = x; throw x; 27 } catch (Error x) { 28 thrown = x; throw x; 29 } catch (Throwable x) { 30 thrown = x; throw new Error(x); 31 } finally { 32 afterExecute(task, thrown); 33 } 34 } finally { 35 task = null; 36 w.completedTasks++; 37 w.unlock(); 38 } 39 } 40 completedAbruptly = false; 41 } finally { 42 processWorkerExit(w, completedAbruptly);//处理回收的worker 43 } 44 }
run
其中getTask中允许线程的复用,我们去看一下。
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 // 检测线程池状态 9 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 10 decrementWorkerCount(); 11 return null;//返回null由worker自己结束 12 } 13 14 int wc = workerCountOf(c); 15 16 // workerCount大于corePoolSize时候、为true 17 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 18 19 //大于maximum或者上次获取已经超时 20 if ((wc > maximumPoolSize || (timed && timedOut)) 21 && (wc > 1 || workQueue.isEmpty())) { 22 if (compareAndDecrementWorkerCount(c)) 23 return null;//返回null由worker自己结束 24 continue;//cas失败,继续尝试获取task 25 } 26 27 try { 28 Runnable r = timed ? 29 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://大于core,在keepAliveTime时间内尝试从队列获取任务 30 workQueue.take();//小于或等于,阻塞获取任务 31 if (r != null) 32 return r; 33 timedOut = true;//r==null,超时,进入下次循环 34 } catch (InterruptedException retry) { 35 timedOut = false; 36 } 37 } 38 }
getTask
主要就是判断了超时时间,CachedThreadPool空闲线程的回收也就是通过这个来判断的。
而FixedThreadPool的线程则一直在队列获取时阻塞。
另外还有个processWorkerExit比较简单,主要是从worker集合中删除,这里就不再说明了。