深入浅出Java线程池:使用篇完整的后端开发流程深入浅出Java线程池:使用篇
手动步骤走一种完整的后端开发流程
服务端
1、将远程仓库的jar包 拷贝 到本地仓库
2、将项目代码 拷贝 到本地 并建立路径 能够执行编译
3、编译打包项目(package)至项目下,项目跑起来后进行本地测试
4、版本稳定后,上测试环境
上测试环境
1、将远程仓库的jar包 拷贝 到测试环境
2、将本地的项目代码 上传 到测试环境 pom能建立路径 执行mvn脚本进行编译打包
3、编译打包项目(package)至项目下,项目跑起来后进行测试
4、版本在测试环境稳定后,install至本地仓库,在上传至远程仓库
5、不推荐嫌麻烦直接上传本地jar包的方式,因为这样无法发现由于环境造成的错误而且传输速度没有直接编译的快
客户端联调
1、将远程仓库的jar包(包括刚刚上传的服务端jar) 拷贝 到本地仓库
2、将项目代码 拷贝 到本地 并建立路径 能够执行编译
3、编译打包项目(package)至项目下,项目跑起来后进行本地测试
4、项目注册至RPC服务中来访问跑在测试环境的服务端项目
5、版本稳定后,上测试环境联调。
团队的技术栈,基于这个背景再展开后面将提到的几个问题,将会有更深刻的体会。
控制层基于SpringMvc,数据持久层基于JdbcTemplate自己封装了一套类MyBatis的Dao框架,视图层基于Velocity模板技术,其余组件基于SpringCloud全家桶。
问题1
某应用发布以后开始报数据库连接池不够用异常,日志如下:
1
|
com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 60000, active 500, maxActive 500, creating 0 |
很明显这是数据库连接池满了,当时处于业务低峰期,所以很显然并不是由于流量突发造成的,另一种可能性是长事务导致,一般是事务中掺杂了外部网络调用,最终跟业务负责人一起排除了长事务的可能性。
还有什么可能呢?我随即想到了是不是没有释放连接导致,我跟业务负责人说了这个想法,他说这种可能性不大,连接的获取和释放都是由框架完成的,如果这块有问题早反映出来了,我想也是。
框架的确给我们带来了很大的便利性,将业务中一些重复性的工作下沉到框架中,提高了研发效率,不夸张的说有些人脱离了Spring,MyBatis,SpringMvc这些框架,都不会写代码了。
那会是什么原因呢?我又冒出来一个想法,有没有可能是某些功能框架支持不了,所以开发绕过了框架自己实现,进而导致连接没有释放,我跟业务负责人说了这个想法以后,他说:“这个有可能,这次有个功能需要获取到数据库名,所以自己通过Connection对象获取的”,说到这儿答案大概已经出来了,一起看下这段代码:
public String getSchema(String tablename, boolean cached) throws Exception { return this.getJdbcTemplate(tablename).getDataSource().getConnection().getCatalog(); }
代码很简单通过JdbcTemplate获取DataSource,再通过DataSource获取Connection,最终通过Connection获取数据库名,就是这一行简单的代码将数据库连接耗尽,因为这里并没有释放连接的动作,之前的为什么都没有问题呢,因为普通的查询都是委派给JdbcTemplate来实现的,它内部会释放连接,找一个简单的query方法看下:
public <T> T query(PreparedStatementCreator psc, @Nullable final PreparedStatementSetter pss, final ResultSetExtractor<T> rse) throws DataAccessException { Assert.notNull(rse, "ResultSetExtractor must not be null"); this.logger.debug("Executing prepared SQL query"); return this.execute(psc, new PreparedStatementCallback<T>() { @Nullable public T doInPreparedStatement(PreparedStatement ps) throws SQLException { ResultSet rs = null; Object var3; try { if (pss != null) { pss.setValues(ps); } rs = ps.executeQuery(); var3 = rse.extractData(rs); } finally { JdbcUtils.closeResultSet(rs); if (pss instanceof ParameterDisposer) { ((ParameterDisposer)pss).cleanupParameters(); } } return var3; } }); }
public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action) throws DataAccessException { Assert.notNull(psc, "PreparedStatementCreator must not be null"); Assert.notNull(action, "Callback object must not be null"); if (this.logger.isDebugEnabled()) { String sql = getSql(psc); this.logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : "")); } Connection con = DataSourceUtils.getConnection(this.obtainDataSource()); PreparedStatement ps = null; Object var13; try { ps = psc.createPreparedStatement(con); this.applyStatementSettings(ps); T result = action.doInPreparedStatement(ps); this.handleWarnings((Statement)ps); var13 = result; } catch (SQLException var10) { if (psc instanceof ParameterDisposer) { ((ParameterDisposer)psc).cleanupParameters(); } String sql = getSql(psc); psc = null; JdbcUtils.closeStatement(ps); ps = null; DataSourceUtils.releaseConnection(con, this.getDataSource()); con = null; throw this.translateException("PreparedStatementCallback", sql, var10); } finally { if (psc instanceof ParameterDisposer) { ((ParameterDisposer)psc).cleanupParameters(); } JdbcUtils.closeStatement(ps); DataSourceUtils.releaseConnection(con, this.getDataSource()); } return var13; }
query方法基于execute这个模板方法实现,在execute内部会通过finally来确保连接的释放
DataSourceUtils.releaseConnection,所以不会有连接耗尽的问题,问题已经很清晰了,改造也很简单,大概有几下几种方法:
1.显示的关闭连接,这里可以借助jdk的try resource语句,简单明了。
public String getSchema(String tablename, boolean cached) throws Exception { try(Connection connection = this.getJdbcTemplate(tablename).getDataSource().getConnection()){ return connection.getCatalog(); } }
2.借助于JdbcTemplate的模板方法设计思想来解决,它提供了一个execute方法,用户只要实现ConnectionCallback这个接口就可以获取到Connection对象,在内部执行获取数据库名的逻辑,最终关闭连接由finally完成。
/** * Execute a JDBC data access operation, implemented as callback action * working on a JDBC Connection. This allows for implementing arbitrary * data access operations, within Spring\'s managed JDBC environment: * that is, participating in Spring-managed transactions and converting * JDBC SQLExceptions into Spring\'s DataAccessException hierarchy. * <p>The callback action can return a result object, for example a domain * object or a collection of domain objects. * @param action a callback object that specifies the action * @return a result object returned by the action, or {@code null} if none * @throws DataAccessException if there is any problem */ @Nullable public <T> T execute(ConnectionCallback<T> action) throws DataAccessException { Assert.notNull(action, "Callback object must not be null"); Connection con = DataSourceUtils.getConnection(this.obtainDataSource()); Object var10; try { Connection conToUse = this.createConnectionProxy(con); var10 = action.doInConnection(conToUse); } catch (SQLException var8) { String sql = getSql(action); DataSourceUtils.releaseConnection(con, this.getDataSource()); con = null; throw this.translateException("ConnectionCallback", sql, var8); } finally { DataSourceUtils.releaseConnection(con, this.getDataSource()); } return var10; }
jdbcTemplate.execute(new ConnectionCallback<Object>() { @Override public Object doInConnection(Connection connection) throws SQLException, DataAccessException { return connection.getCatalog(); } });
虽然两种都能解决问题,但我还是更推崇第二种方式,因为这种更贴合框架的设计思想,将一些重复性的逻辑继续交给框架去实现,这里也体现出框架很重要的一个特点,就是对使用者提供扩展。
问题2
前几天写了一个Spring AOP的拦截功能,发现怎么也进不到拦截逻辑中,表达式确定没问题,让我百思不得其解,最终冷静下来逐步排错。
第一个很明显的错误是被拦截的对象并没有纳入Spring管理,所以当即把对象交由Spring管理,问题依然没有解决,我开始回想代理的原理。
Spring代理提供了两种实现方式,一种是jdk的动态代理,另一种是cglib代理,这两种方式分别适用于代理类实现了接口和代理类未实现接口的情况,其内部思想都是基于某种规约(接口或者父类)来生成一个Proxy对象,在Proxy对象方法调用时先调用InvocationHandler的invoke方法,在invoke方法内部先执行代理逻辑,再执行被代理对象的真实逻辑,这里贴一段jdk动态代理生成的Proxy对象的源文件供大家阅读:
public class ProxyTest { /** 定义目标接口,内部包含一个hello方法(这其实就是一个规约) */ public interface ProxyT{ void hello(); } /** 实现类,实现了ProxyT接口 */ public static class ProxyTImpl implements ProxyT{ @Override public void hello() { System.out.println("aaaa"); } } public static void main(String[] args) { //设置生成Proxy对象的源文件 System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles", "true"); ProxyT proxyT1 = (ProxyT)Proxy.newProxyInstance(ProxyT.class.getClassLoader(),new Class[]{ProxyT.class}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { System.out.println("invoke"); return method.invoke(proxyT,args); } }); proxyT1.hello(); } }
最终生成的Proxy源文件如下:
package com.sun.proxy; import coding4fun.ProxyTest.ProxyT; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.lang.reflect.UndeclaredThrowableException; /** 生成的proxy源码,继承jdk的Proxy类,实现了ProxyT接口 (这里其实也解释了为什么jdk的动态代理只能基于接口实现,不能基于父类,因为Proxy 必须继承jdk的Proxy,而java又是单继承,所以Proxy只能基于接口这个规约来生成) */ public final class $Proxy0 extends Proxy implements ProxyT { private static Method m1; private static Method m3; private static Method m2; private static Method m0; public $Proxy0(InvocationHandler var1) throws { super(var1); } public final boolean equals(Object var1) throws { try { return (Boolean)super.h.invoke(this, m1, new Object[]{var1}); } catch (RuntimeException | Error var3) { throw var3; } catch (Throwable var4) { throw new UndeclaredThrowableException(var4); } } //hello方法将调用权交给了InvocationHandler public final void hello() throws { try { super.h.invoke(this, m3, (Object[])null); } catch (RuntimeException | Error var2) { throw var2; } catch (Throwable var3) { throw new UndeclaredThrowableException(var3); } } public final String toString() throws { try { return (String)super.h.invoke(this, m2, (Object[])null); } catch (RuntimeException | Error var2) { throw var2; } catch (Throwable var3) { throw new UndeclaredThrowableException(var3); } } public final int hashCode() throws { try { return (Integer)super.h.invoke(this, m0, (Object[])null); } catch (RuntimeException | Error var2) { throw var2; } catch (Throwable var3) { throw new UndeclaredThrowableException(var3); } } static { try { m1 = Class.forName("java.lang.Object").getMethod("equals", Class.forName("java.lang.Object")); m3 = Class.forName("coding4fun.ProxyTest$ProxyT").getMethod("hello"); m2 = Class.forName("java.lang.Object").getMethod("toString"); m0 = Class.forName("java.lang.Object").getMethod("hashCode"); } catch (NoSuchMethodException var2) { throw new NoSuchMethodError(var2.getMessage()); } catch (ClassNotFoundException var3) { throw new NoClassDefFoundError(var3.getMessage()); } } }
到这里其实我已经有了答案,是我给Spring的规约(接口或者父类)出现了问题,首先我要代理的类并没有实现接口,所以这里的规约不是接口,而是我这个类本身,从cglib的原理来讲,它是将要代理的类作为父类来生成一个Proxy类,重写要代理的方法,进而添加代理逻辑,问题就在于我那个类的方法是static的,而static方法是没法重写的,所以导致一直没有进拦截逻辑,将static方法改为实例方法就解决了问题,这里贴一段cglib动态代理生成的Proxy对象的源文件供大家阅读:
public class cglibtest { //定义被代理的类ProxyT,内部有一个hello方法 public static class ProxyT{ public void hello() { System.out.println("aaaa"); } } //定义一个方法拦截器,和jdk的InvocationHandler类似 public static class Interceptor implements MethodInterceptor { @Override public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { //简单的打印 System.out.println("before invoke hello"); //执行被代理类的方法(hello) return methodProxy.invokeSuper(o,objects); } } public static void main(String[] args) { // 设置CGLib代理类的生成位置 System.setProperty(DebuggingClassWriter.DEBUG_LOCATION_PROPERTY, "./cg"); // 设置JDK代理类的输出 System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles", "true"); MethodInterceptor methodInterceptor = new Interceptor(); Enhancer enhancer = new Enhancer(); //设置父类 enhancer.setSuperclass(ProxyT.class); //设置方法回调 enhancer.setCallback(methodInterceptor); ProxyT proxy = (ProxyT)enhancer.create(); proxy.hello(); } }
最终生成的Proxy源文件如下(删除了部分代码,只保留了重写hello方法逻辑):
//继承ProxyT public class cglibtest$ProxyT$$EnhancerByCGLIB$$8b3109a3 extends ProxyT implements Factory { final void CGLIB$hello$0() { super.hello(); } //重写hello方法 public final void hello() { //方法拦截器 MethodInterceptor var10000 = this.CGLIB$CALLBACK_0; if (var10000 == null) { CGLIB$BIND_CALLBACKS(this); var10000 = this.CGLIB$CALLBACK_0; } if (var10000 != null) { //执行方法拦截器 var10000.intercept(this, CGLIB$hello$0$Method, CGLIB$emptyArgs, CGLIB$hello$0$Proxy); } else { super.hello(); } } }
总结
前面描述了笔者近期工作中遇到的两个问题,不能说多么有难度,但是我相信应该有不少人都碰到过,不知道你是怎么解决的呢?解决了以后有没有深挖其背后的原理呢,好多人说自己的工作都是简单的crud没有提高,那何不尝试着深挖框架背后的原理,深挖那些看似普通但背后并不简单的问题的本质。
框架虽好,但不要丢了其背后的原理。
<h3>深入浅出Java线程池:使用篇</h3>
前言
很高兴遇见你~
借助于很多强大的框架,现在我们已经很少直接去管理线程,框架的内部都会为我们自动维护一个线程池。例如我们使用最多的okHttp以及他的封装框架Retrofit,线程封装框架RxJava和kotlin协程等等。为了更好地使用这些框架,则必须了解他的实现原理,而了解他的原理,线程池是永远绕不开的话题。
线程的创建与切换的成本是比较昂贵的。JVM的线程实现使用的是轻量级进程,也就是一个线程对应一个cpu核心。因此在创建与切换线程时,则会涉及到系统调用,是开销比较大的过程。为了解决这个问题,线程池诞生了。
与很多连接池,如sql连接池、http连接池的思想类似,线程池的出现是为了复用线程,减少创建和切换线程所带来的开销,同时可以更方便地管理线程。线程池的内部维护有一定数量的线程,这些线程就像一个个的“工人”,我们只需要向线程池提交任务,那么这些任务就会被自动分配到这些“工人”,也就是线程去执行。
线程池的好处:
- 减少资源损耗。重用线程、控制线程数量,减少线程创建和切换所带来的开销。
- 提高响应速度。可直接使用线程池中空闲的线程而不必等待线程的创建。
- 方便管理线程。线程池可以对其中的线程进行简单的管理,如实时监控数据调整参数、设置定时任务等。
这个系列文章主要分两篇:使用篇与原理篇。作为使用篇,顾名思义,主要是了解什么是线程池以及如何正确去使用它。
线程池的主要实现类有两个:ThreadPoolExecutor和ScheduledThreadPoolExecutor,后者继承了前者。线程池的配置参数比较多,系统也预设了一些常用的线程池,放在Executors中,开发者只需要配置简单的参数就可以。线程池的可执行任务有两种对象:Runnable和Callable,这两种对象可以直接被线程池执行,以及他们的异步返回对象Future。
那么以上讲到的,就是本文要讨论的内容。首先,从线程池的可执行任务开始。
任务类型
Runnable
public interface Runnable {
public abstract void run();
}
Runnable我们是比较熟悉的了。他是一个接口,内部只有一个方法 run
,没有参数,没有返回值。当我们提交一个Runnable对象给线程池执行时,他的 run
方法会被执行。
Callable
public interface Callable<V> {
V call() throws Exception;
}
与Runnable很类似,最大的不同就是他拥有返回值,而且会抛出异常。任务对象适合用于需要等待一个返回值的后台计算任务。
Future
public interface Future<V> {
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否被取消
boolean isCancelled();
// 任务是否已经完成
boolean isDone();
// 获取返回值
V get() throws InterruptedException, ExecutionException;
// 在一定的时间内等待返回值
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Futrue他并不是一个任务,而是一个计算结果异步返回的管理对象。前面的Callable任务提交给线程池之后,他需要一定的计算时间才能将结果返回,所以线程池会返回一个Future对象。我们可以调用他的 isDone
方法来检测是否完成,如果完成可以调用 get
方法来获取结果。同时也可以通过 cancel
和 isCancel
来取消或者判断任务是否被取消。如下图:
- 当Future未被执行或正在执行中时,get方法会阻塞直到执行完成。如果不希望等待时间过长,可以调用它另外一个带有时间参数的方法,该方法等待指定时间之后,如果任务尚未完成则会抛出TimeoutException异常。
- 当Future执行完成之后,get方法会返回结果;而此时如果任务是被取消,那么会抛出异常。
- 当一个任务尚未被执行时,cancel方法会让该任务不会被执行,直接结束。
- 当任务正在执行,cancel方法的参数如果为false,那么不会中断任务;而如果参数是true则会尝试中断任务。
- 当任务已完成,取消方法返回false,取消失败。
Futrue接口的具体实现类是FutureTask,该类同时实现了Runnable接口,可以被线程池直接执行。我们可以通过传入一个Callable对象来创建一个FutureTask。如果是Runnable对象,则可以通过 Executors.callable()
来构造一个Callable对象,只不过这个Callable对象返回null,所构造出来的Future对象get方法在成功时也会返回null。当FutureTask的 run
方法被执行后,其所包含的任务开始执行。
当然,我们也可以单独使用FutureTask,如下:
// 创建一个FutureTask
FutureTask<String> future = new FutureTask<>(() -> {
Thread.sleep(1000);
return "一只修仙的猿";
});
// 使用一个后台线程来执行他的run方法
new Thread(future).start();
// 执行结束之后可以通过他的get方法得到返回值
System.out.println(future.get());
当然,如果我们要这样使用的话,那为什么不用线程池呢?(手动狗头)
接下来介绍线程池的两个主要类:ThreadPoolExecutor和ScheduledThreadPoolExecutor。
ThreadPoolExecutor
概述
我们常说的线程池,很大程度上指的就是ThreadPoolExecutor,他也是我们使用最为频繁的线程池。ThreadPoolExecutor的内部核心角色有三个:等待队列、线程和拒绝策略者:
线程池的内部很像一个工厂,等待队列就如同一个流水线,我们的任务就放在里面。 线程分为核心线程和非核心线程,他们就如同一个个的 “工人” ,从等待队列中获取任务进行执行。而如果这个“工厂”已经无法接受更多的任务,那么这个任务就会交给拒绝策略者去处理。
这里要特别注意的是,图中我分为两种线程是为了方便理解,而在实际中, 线程本身并没有核心与非核心之分 ,只有线程数大于核心线程数与小于核心线程数之分 。
例如核心线程数限制为3,但现在有4个线程正在工作:甲乙丙丁。当甲先执行完成时进入空闲状态时,因为总数超出设定的3,那么甲会被认为非核心线程被关闭。同理,如果丁先执行完成任务,则是丁被认为非核心线程被关闭。
ThreadPoolExecutor并不是把每个新任务都直接放到等待队列中,而是有一套既定的规则来执行每个新任务:
- 情况1:在线程数没有达到核心线程数时,每个新任务都会创建一个新的线程来执行任务。
- 情况2:当线程数达到核心线程数时,每个新任务会被放入到等待队列中等待被执行。
- 情况3:当等待队列已经满了之后,如果线程数没有到达总的线程数上限,那么会创建一个非核心线程来执行任务。
- 情况4:当线程数已经到达总的线程数限制时,新的任务会被拒绝策略者处理,线程池无法执行该任务。
这里我们可以发现对于线程池中的角色,有各种各样的数量上限,具体的有以下参数:
- 核心线程数corePoolSize:指定核心线程的数量上限;当线程数小于核心线程数,那么线程是不会被销毁的,除非通设置线程池的
allowCoreThreadTimeOut
参数为true,那么核心线程在等待keepAliveTime
时间之后,就会被销毁。 - 允许核心线程被回收allowCoreThreadTimeOut :是否允许核心线程被回收。
- 最大线程数maximumPoolSize:指定线程总数的上限。线程总数=核心线层数+非核心线程数。当等待队列满了之后,新的任务会创建新的非核心线程来执行,直到线程数到达总数的上限。
- 线程闲置时间keepAliveTime:线程空闲等待该时间后会被销毁。非核心线程默认会被销毁,而核心线程则需要开发者自己设定是否允许被销毁。
- 等待队列BlockingQueue:存放任务的队列。我们可以在创建队列的时候设置队列的长度。一般使用的队列有LinkedBlockingQueue、ArrayBlockingQueue、SychronizeQueue、PriorityBlockingQueue等,前两者是普通的阻塞队列,最后一个是优先阻塞队列,剩下的一个是没有容量的队列。
- 拒绝策略者RejectExecutionHandler :线程池无法处理的任务就会交给他处理。
通过设置线程池的这些参数可以让线程池拥有完全不同的特性,来适应不同的情景。通常我们在ThreadPoolExecutor的构造方法中,会传入这些参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
基本都是我们上面介绍过的参数,有两个参数我们还没见过:
- 时间单位unit:给前面的keepAliveTime指定时间单位。
- 线程工厂threadFactory: 线程池会通过线程工厂来创建新的线程,主要是给线程指定一个有意义的名字。
当然,我们也可以不用全部指定上面的参数,ThreadFactory和RejectedExecutionHandler不指定可以使用默认的参数。
配置ThreadPoolExecutor
经过概述,我们对ThreadPoolExecutor的参数以及内部结构已经非常清楚了,接下来探讨一下如何合理地进行配置。
首先是核心线程数。核心线程太少,线程之间会互相争夺cpu资源,多个线程之间进行时间片轮换,大量的线程切换工作会增大系统的开销;核心线程太少,会导致一些cpu核心在挂起等待阻塞操作,没法充分利用cpu资源;核心线程数的配置就是要均衡这两个特点。
关于核心线程数的配置,有一个广为人知的默认规则:
cpu密集型任务,设置为CPU核心数+1;
IO密集型任务,设置为CPU核心数*2;深入浅出Java线程池:使用篇
CPU密集型任务指的是需要cpu进行大量计算的任务,这个时候我们需要尽可能地压榨CPU的利用率。此时核心数不宜设置过大,太多的线程会互相抢占cpu资源导致不断切换线程,反而浪费了cpu。最理想的情况是每个CPU都在进行计算,没有浪费。但很有可能其中的一个线程会突然挂起等待IO,此时额外的一个等待线程就可以马上进行工作,而不必等待挂起结束。
IO密集型任务指的是任务需要频繁进行IO操作,这些操作会导致线程长时间处于挂起状态,那么需要更多的线程来进行工作,不会让cpu都处于挂起状态,浪费资源。一般设置为cpu核心数的两倍即可。
当然实际情况还需要根据任务具体特征来配置,例如系统资源有限,有一些线程被挂在后台持续工作,那么这个时候就必须适当减少线程数,从而减少线层切换的次数。
第二是阻塞队列的配置。
阻塞队列有4个内置的选择:LinkedBlockingQueue、ArrayBlockingQueue、SychronizeQueue和PriorityBlockingQueue,其次还有比较少用的DelayedWorkQueue。
如果学习过Android的Handler机制的话,会发现他们跟Handler内部的MessageQueue是非常像的。ThreadPoolExecutor中的线程,相当于Handler的Looper;阻塞队列,相当于Handler的MessageQueue。他们都会去队列中获取新的任务,当队列为空时,queue.poll()
方法会进行阻塞,直到下一个新的任务来临。
LinkedBlockingQueue、ArrayBlockingQueue都是普通的阻塞队列,尾插头出;区别是后者在创建的时候必须指定长度。
SychronizeQueue是一个没有容量的队列,每一个插入到这个队列的任务必须马上找到可以执行的线程,如果没有则拒绝执行。
PriorityBlockingQueue是具有优先级的阻塞队列,里面的任务会根据设置的优先级进行排序。所以优先级低的任务可能会一直被优先级高的任务顶下去而得不到执行。
DelayedWorkQueue则非常像Handler中的MessageQueue了,可以给任务设置延时。
阻塞队列的选择也是非常重要,一般来说必须要指定阻塞队列的长度。如果使用无限长的队列,那么有可能在大量的任务到来时直接OOM,程序崩溃。
第三是线程总数。
在阻塞队列满了之后,如果线程总数还没到达上限,会创建额外的线程来执行。这对应付突然的大量但轻量的任务的时候有奇效。通过创建更多的线程来提高并发效率。
但是同时也要注意,如果线程数量太多,会造成频繁进行线程切换导致高昂的系统开销。
第四是线程存活时间。
这里一般指非核心线程的存活时间。这个参数的意义在于,当线程都进入空闲的时候,可以回收部分线程来减少系统资源占用。为了提高线程池的响应速度,一般比较少去回收核心线程。
第五是线程工厂。
这个参数比较简单,继承接口然后重写 newThread
方法即可。主要的用途在于给创建的线程设置一个有意义的名字。
最后一个是拒绝策略者。
ThreadPoolExecutor内置有4种策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。默认是使用AbortPolicy,我们可以在构造方法中传入这些类的实例,在任务被拒绝的时候,会回调他们的 rejectedExecution
方法。
- AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- DiscardPolicy:也是丢弃任务,但是不抛出异常。
- DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- CallerRunsPolicy:由调用线程直接执行该任务
当然我们也可以自己实现RejectedExecutionHandler接口来自定义自己的拒绝策略。
线程池的使用
- execute(Runnable command) :执行一个任务
- Future submit(Runnable/Callable):提交一个任务,这个任务拥有返回值
- shutDown/shutDownNow:关闭线程池。后者还有去尝试中断正在执行的任务,也就是调用正在执行的线程的interrupt方法
- preStartAllCoreThread:提前启动所有的线程
- isShutDown:线程池是否被关闭
- isTerminad:线程池是否被终止。区别于shutDown,这个状态下的线程池没有正在执行的任务
这些都是比较常用的api,更多的api读者可以去阅读api文档。
监控线程池
ThreadPoolExecutor中有很多参数可以提供我们参考线程池的运行情况:
- largestPoolSize:线程池中曾经到达的最大线程数量
- completedTaskCount:完成的任务个数
- getAliveCount:获取当前正在执行任务的线程数
- getTaskCount:获取当前任务个数
- getPoolSize:获取当前线程数
此外还有很多的get方法来获取相关参数,提供动态的线程池运行情况监控,感兴趣的读者可以去阅读相关的api。
ThreadPoolExecutor中还有提供了一些的回调方法:
- beforeExecute:在任务执行前被调用
- afterExecute:在任务执行之后被调用
- terminated:在线程池终止之后被调用
我们可以通过继承ThreadPoolExecutor来重写这些方法。
ScheduledThreadPoolExecutor
概述
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,整体的内部结构和ThreadPoolExecutor是一样的。有一个重点的不同就是阻塞队列使用的是DelayedWorkQueue。他可以根据我们设置的时间延迟来对任务进行排序,让任务按照时间顺序进行执行,和MessageQueue非常像。
ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,有一系列可以设置延时任务、周期任务的api。
定时周期任务我们会想到Timer这个框架。这里简单做个对比:
- Timer是系统时间敏感的,他会根据系统时间来触发任务
- Timer内部只有一个线程,可能会造成执行任务阻塞无法按时执行;而ScheduledThreadPoolExecutor可以创建多个线程来执行任务
- Timer遇到异常时会直接抛出,线层终止;而ScheduledThreadPoolExecutor有一个更加完善的异常处理机制。
参数配置
先看到ShcduledhreadPoolExecutor的构造器:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor有几个构造器,参数数量最多的是上面这个。父类是ThreadPoolExecutor,所以这里是直接调用到ThreadPoolExecutor的构造器:
- 核心线程数:这个由我们自己传入参数设定
- 线程数上限:这个设置Integer.MAX_VALUE,可以认为是没有上限
- keepAliveTime:10毫秒,基本上只要线程进入空闲状态马上就会被回收
- 阻塞队列:DelayedWorkQueue在上面介绍过了,可以设置延迟时间的阻塞队列
- 线程工厂和拒绝策略由开发者自定义
可以看到ShceduledThreadPoolExecutor的配置还是比较简单的,重点在于他的延时阻塞队列可以设置延时任务;keepAliveTime时间的设置让空闲的线程马上就会被回收。
线程池的使用
ScheduledThreadPoolExecutor有ThreadPoolExecutor的接口,同时还包含了一些定时任务的接口:
- schedule:传入一个任务和延迟的时间,在延迟时间之后开始执行任务
- scheduleAtFixedRate:设置固定时间点指定任务。传入两个时间,第一次执行在延迟初始化的时间后;之后每隔指定时间执行一次。
- scheduledWithFixedDelay:在初始延迟之后,每次执行之后延迟指定时间再次执行。
重点就是上面的三个方法的使用,其他的和ThreadPoolExecutor类似。
Executors
线程池配置的参数很多,框架内置了一些已经配置好参数的线程池,如果懒得去配置参数,可直接使用内置的线程池,可以使用Executors.newXXX方法来构建。
ThreadPoolExecutor有三个配置好参数的线程池:FixedTthreadPool、CacheThreadPool、SingleThreadExecutor。ScheduledThreadPoolExecutor有两个配置好参数的线程池:ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor。
我们看一下这些线程池:
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
这类线程池只有核心线程,数量固定且不会被回收,等待队列的长度没有上限。
这个线程池的特点就是线程数量固定,适用于负载较重的服务器,可以通过这种线程池来限制线程数量;线程不会被回收,也有比较高的响应速度。
但是等待队列没有上限,在任务过多时有可能发生OOM。
CacheThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
这类线程池没有核心线程,而且线程数量上限为Integer.MAX_VALUE,可以认为没有上限。等待队列没有长度,每一个任务到来都会分配一个线程来执行。
这类线程池的特点就是每个任务都会被马上执行,在任务数量过大时可能会创建大量的线程导致系统OOM。但是在一些任务数量多但执行时间短的情景下比较适用。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
只有一个线程的线程池,等待队列没有上限,每个任务都会按照顺序被执行。适用于对任务执行顺序有严格要求的场景。
ScheduledThreadPoolExecutor
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
和ScheduledThreadPoolExecutor原生默认的构造器差别不大。
SingleThreadScheduledPoolExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
控制线程只有一个。每个任务会按照顺序先后被执行。
小结
可以看到,Executors只是按照一些比较大的情景方向,对线程池的参数进行简单的配置。那可能会问:那我直接使用线程池的构造器自己设置不就完事了?
确实,阿里巴巴开发者手册也是这样建议的。尽量使用原生的构造器来创建线程池对象,这样我们可以根据实际的情况配置出更加规范的线程池。Executors中的线程池在一些极端的情况下都可能会发生OOM,那么我们自己配置线程池时就要尽量避免这个问题。
最后
关于线程池的使用总体上就介绍到这里,线程池有非常多的优点,希望下次需要创建线程的时候,不会只记得 new Thread
。
下一篇将深入线程池的内部实现原理,如果了解过Android的Handler机制会发现两者的设计几乎一模一样,也是非常有趣的。
希望文章对你有帮助。
参考文献
- 《Java并发编程的艺术》:并发编程必读,作者对一些原理讲的很透彻
- 《Java核心技术卷》:这系列的书主要是讲解框架的使用,不会深入原理,适合入门
- javaGuide:javaGuide,对java知识总结得很不错的一个博客
- Java并发编程:线程池的使用:博客园上一位很优秀的博主,文章写得通俗易懂且不失深度
全文到此,原创不易,觉得有帮助可以点赞收藏评论转发。
笔者才疏学浅,有任何想法欢迎评论区交流指正。
如需转载请评论区或私信交流。另外欢迎光临笔者的个人博客:传送门