SpringMVC源码情操陶冶#task-executor解析器
承接Spring源码情操陶冶-自定义节点的解析。线程池是jdk的一个很重要的概念,在很多的场景都会应用到,多用于处理多任务的并发处理,此处借由spring整合jdk的cocurrent包的方式来进行深入的分析
spring配置文件样例
配置简单线程池
<task-executor keep-alive="60" queue-capacity="20" pool-size="5" rejection-policy="DISCARD">
</task-executor>
以上属性是spring基于task命令空间提供的对外属性,一般是线程池的基础属性,我们可以剖析下相应的解析类具体了解下spring是如何整合线程池的
ExecutorBeanDefinitionParser-解析task-executor节点
-
我们可以直接看下解析的具体方法
doParse()
,代码如下@Override protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { String keepAliveSeconds = element.getAttribute("keep-alive"); if (StringUtils.hasText(keepAliveSeconds)) { builder.addPropertyValue("keepAliveSeconds", keepAliveSeconds); } String queueCapacity = element.getAttribute("queue-capacity"); if (StringUtils.hasText(queueCapacity)) { builder.addPropertyValue("queueCapacity", queueCapacity); } configureRejectionPolicy(element, builder); String poolSize = element.getAttribute("pool-size"); if (StringUtils.hasText(poolSize)) { builder.addPropertyValue("poolSize", poolSize); } }
很简单就是读取我们第一部分所罗列的属性值,并塞入至BeanDefinitionBuilder对象的属性集合中。借此
对任务的拒绝策略属性解析方法configureRejectionPolicy()
我们也可以简单的看下private void configureRejectionPolicy(Element element, BeanDefinitionBuilder builder) { String rejectionPolicy = element.getAttribute("rejection-policy"); if (!StringUtils.hasText(rejectionPolicy)) { return; } String prefix = "java.util.concurrent.ThreadPoolExecutor."; if (builder.getRawBeanDefinition().getBeanClassName().contains("backport")) { prefix = "edu.emory.mathcs.backport." + prefix; } String policyClassName; if (rejectionPolicy.equals("ABORT")) { policyClassName = prefix + "AbortPolicy"; } else if (rejectionPolicy.equals("CALLER_RUNS")) { policyClassName = prefix + "CallerRunsPolicy"; } else if (rejectionPolicy.equals("DISCARD")) { policyClassName = prefix + "DiscardPolicy"; } else if (rejectionPolicy.equals("DISCARD_OLDEST")) { policyClassName = prefix + "DiscardOldestPolicy"; } else { policyClassName = rejectionPolicy; } builder.addPropertyValue("rejectedExecutionHandler", new RootBeanDefinition(policyClassName)); }
由此可看出拒绝策略采取的基本上是
java.util.concurrent.ThreadPoolExecutor
工具包下的静态内部类,同时也支持用户自定义的拒绝策略。对拒绝策略此处作下小总结rejection-policy(Spring) jdk对应类 含义 ABORT java.util.concurrent.ThreadPoolExecutor.AbortPolicy 抛出拒绝的异常信息 CALLER_RUNS java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy 直接执行对应的任务(线程池不关闭) DISCARD java.util.concurrent.ThreadPoolExecutor.DiscardPolicy 直接丢弃任务 DISCARD_OLDEST java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy 直接丢弃队列最老的任务(最靠头部),塞入此任务到队列尾部 -
bean实体类
那么我们肯定要知道是spring的哪个bean来实例化我们的线程池配置呢?答案就在getBeanClassName()
方法@Override protected String getBeanClassName(Element element) { return "org.springframework.scheduling.config.TaskExecutorFactoryBean"; }
直接通过
TaskExecutorFactoryBean
bean工厂来实例化线程池,很有意思,我们也别忘了其获取实例化对象其实是调用了其内部的getObject()
方法。我们继续跟踪把
TaskExecutorFactoryBean-线程池bean工厂类
看继承结构,我们细心的发现其实现了InitializingBean
接口,那我们直接关注afterPropertiesSet()
方法
@Override
public void afterPropertiesSet() throws Exception {
// 实例化的bean类型为ThreadPoolTaskExecutor
BeanWrapper bw = new BeanWrapperImpl(ThreadPoolTaskExecutor.class);
determinePoolSizeRange(bw);
// 基本属性保存
if (this.queueCapacity != null) {
bw.setPropertyValue("queueCapacity", this.queueCapacity);
}
if (this.keepAliveSeconds != null) {
bw.setPropertyValue("keepAliveSeconds", this.keepAliveSeconds);
}
if (this.rejectedExecutionHandler != null) {
bw.setPropertyValue("rejectedExecutionHandler", this.rejectedExecutionHandler);
}
if (this.beanName != null) {
bw.setPropertyValue("threadNamePrefix", this.beanName + "-");
}
// 实例化ThreadPoolTaskExecutor对象
this.target = (TaskExecutor) bw.getWrappedInstance();
// 并执行相应的afterPropertiesSet方法
if (this.target instanceof InitializingBean) {
((InitializingBean) this.target).afterPropertiesSet();
}
}
真正实例化的对象为
ThreadPoolTaskExecutor.class
task-executor
指定的pool-size
支持-
作为分隔符,比如2-4
,表示线程池核心线程数为2个,最大线程数为4;如果没有分隔符,则最大线程数等同于核心线程数
task-executor
如果指定pool-size
为0-4
且queue-capacity
为null,则会指定线程池的allowCoreThreadTimeout
为true,表明支持核心线程超时释放,默认不支持
ThreadPoolTaskExecutor.class
也是InitializingBean
的实现类,也会被调用afterPropertiesSet()
方法
ThreadPoolTaskExecutor#afterPropertiesSet()-实例化
直接查看initializeExecutor()
初始化jdk对应的线程池
// 默认的rejectedExecutionHandler为AbortPolicy策略
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
// 先创建阻塞的队列
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
// 设置是否允许核心线程超时被收回。默认不允许
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
我们按照上述的注释,按照两步走进行解析
-
阻塞队列的创建
protected BlockingQueue<Runnable> createQueue(int queueCapacity) { if (queueCapacity > 0) { return new LinkedBlockingQueue<Runnable>(queueCapacity); } else { return new SynchronousQueue<Runnable>(); } }
默认情况下是创建
LinkedBlockingQueue
链式队列,因为默认的queueCapacity
大小为Integer.MAX_VALUE
。而queueCapacity
为0的情况下则采取SynchronousQueue
同步队列,其约定塞入一个元素必须等待另外的线程消费其内部的一个元素,其内部最多指定一个元素用于被消费 -
线程池创建
用到最基本的构造方法ThreadPoolExecutor()
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
篇幅限于过长,具体就不解释了直接看注释就明白了
TaskExecutorFactoryBean#getObject()-获取实体类
public TaskExecutor getObject() {
return this.target;
}
由上述可知this.target
对应的class类为ThreadPoolTaskExecutor
,其内部已实例化了ThreadPoolExecutor
线程池对象
小结
Spring创建线程池本质上是通过
ThreadPoolExecutor
的构造方法来进行创建的。由此可知jdk
的concurrent
工具包很有参考价值Spring默认指定的线程池队列为
LinkedBlockingQueue
链式队列,默认支持无线的任务添加,用户也可以指定queue-capacity
来指定队列接受的最多任务数;并采用AborPolicy
策略来拒绝多余的任务Spring指定的
pool-size
支持-
分隔符,具体解释见上文Spring的
task-executor
多与task-scheduler
和task-scheduled-tasks
搭配使用,具体见后文分析