为什么要用线程池?

  • 单独创建线程的缺点:  

  a. 每次new Thread新建对象性能差。
  b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
  c. 缺乏更多功能,如定时执行、定期执行、线程中断。

  • 创建线程池的优点: 

  a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
  b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  c. 提供定时执行、定期执行、单线程、并发数控制,指定队列大小,失败策略等功能,可以根据我们项目的需要自定义创建不同功能的线程池。

  线程池作用就是限制系统中执行线程的数量。
  根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

两种创建方式

  • Executors创建方式 

  但是阿里开发手册上不建议使用这种方式,因为有太多不可控因素会导致oom异常,切无法任务进行干预。如果非要使用这种能实现这些功能的线程池可以用gauva提供的线程工厂来实现类似的功能。

  • Executors.newCachedThreadPool() 
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

  示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Chi {
    public static void main(String[] args) {
        /*没有线程池的写法
        Runnable r = new MyRunnable();
        Thread t = new Thread(r);
        t.start();*/
        ExecutorService e =Executors.newCachedThreadPool();
        Runnable r = new MyRunnable();
        e.submit(r);//获取线程池中的某一个线程对象,然后调用runnable接口中的run方法
        e.submit(r);
        e.submit(r);
        e.submit(r);//注意run方法运行完,线程中的线程并不消耗,而是归还到池中
        e.shutdown();
    }
    }

class MyRunnable implements Runnable{
@Override
    public void run() {
        System.out.println("给我一个线程:"+Thread.currentThread().getName());
        try {
            System.out.println("线程开始消耗资源"+Thread.currentThread().getName());
            Thread.sleep(2000);
            System.out.println("线程使用完毕"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("归还到线程池中"+Thread.currentThread().getName());
    }
    }

  改进后创建newCachedThreadPool方式(引入guava依赖):

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();

    public static ExecutorService createCacheThreadPool(){
        int coreSize = 10;
        int maxSize = 20;
        return new ThreadPoolExecutor(coreSize, maxSize, 10L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    } 

  说明:创建一个可缓存线程池,应用中存在的线程数可以无限大(最大线程数为int最大值)。当提交任务速度高于线程池中任务处理速度时,缓存线程池会不断的创建线程,线程数完全取决于jvm可以创建的线程数,直到资源耗尽会抛出oom异常;如果在执行第二个任务时,第一个任务已经完成,那么会复用第一个任务的线程执行第二个任务,如果一个线程超过60秒没有被使用,就会被线程池回收。所以大家要慎用此方法,适用于提交短期的异步小程序,以及负载较轻的服务器。

 

  • Executors.newFixedThreadPool(2)
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

  示例:   

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Chi {
    public static void main(String[] args) {
        /*没有线程池的写法
        Runnable r = new MyRunnable();
        Thread t = new Thread(r);
        t.start();*/

ExecutorService e =Executors.newFixedThreadPool(2);//创建一个包含两个线程的线程池
        Runnable r = new MyRunnable();
        e.submit(r);//获取线程池中的某一个线程对象,然后调用runnable接口中的run方法
        e.submit(r);
        e.submit(r);
        e.submit(r);//注意run方法运行完,线程中的线程并不消耗,而是归还到池中
        e.shutdown();
    }
    }

class MyRunnable implements Runnable{
@Override
    public void run() {
        System.out.println("给我一个线程:"+Thread.currentThread().getName());
        try {
            System.out.println("线程开始消耗资源"+Thread.currentThread().getName());
            Thread.sleep(2000);
            System.out.println("线程使用完毕"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("归还到线程池中"+Thread.currentThread().getName());
    }
    }

  改进后创建newFixedThreadPool方式(引入gauva依赖):  

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();

public static ExecutorService createFixedThreadPool() {
        int poolSize = 5;
        int queueSize = 10;
        ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize), threadFactory, new ThreadPoolExecutor.AbortPolicy());
        return executorService;
    }

  说明:创建一个线程数固定的线程池,可以根据系统的资源进行设置,如:Runtime.getRuntime().availableProcessors()。线程不会被回收,除非调用shutdown()方法才会回收,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。此方法可控制线程最大并发数,超出线程数据的任务会放到队列中,但是从源码中可以看到此队列为无界队列,所以如果我们一次查出的数据过多很有可能会导致oom异常,因为队列会无限扩充,真正的导致OOM的其实是LinkedBlockingQueue.offer方法。。使用于为了满足资源管理需求而需要限制当前线程数量的场合,使用于负载比较重的服务器但是要注意控制任务数量。

 

  • Executors.newScheduledThreadPool(2)

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}

  改进后创建方式(引入guava依赖):

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();

    private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException {
        Task task = new Task();
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, threadFactory);
        executorService.scheduleAtFixedRate(task,0L,5L, TimeUnit.SECONDS);
        latch.await();
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "executing");
        }
    }

 

  说明:计划型线程池,可以设置固定时间的延时或者定期执行任务,多数情况下可用来替代Timer类,同样是看线程池中有没有空闲线程,如果有,直接拿来使用,如果没有,则新建线程加入池。使用的是 DelayedWorkQueue 作为等待队列,这中类型的队列会保证只有到了指定的延时时间,才会执行任务

  示例:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Chi {
    public static void main(String[] args) {
        /*没有线程池的写法
        Runnable r = new MyRunnable();
        Thread t = new Thread(r);
        t.start();*/
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(3);//参数表示线程容量
        System.out.println(simpleDateFormat.format(new Date()));
        // 但是如果执行任务时间大约2s则不会并发执行后续任务将会延迟。
        ScheduledFuture<?> resultFuture = e.scheduleAtFixedRate(new MyRunnable(), 0, 2000, TimeUnit.MILLISECONDS);//第一个参数任务,第二个参数表示执行任务前等待的时间,第三个参数表示任务启动间隔时间,第四参数表示时间单位
        e.scheduleAtFixedRate(new MyRunnable1(), 0, 2000, TimeUnit.MILLISECONDS);//第一个参数任务,第二个参数表示执行任务前等待的时间,第三个参数表示任务启动间隔时间,第四参数表示时间单位
        // // 由于是定时任务,一直不会返回
        //Object object = resultFuture.get();
    }
}

class MyRunnable implements Runnable{
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"给我一个线程:"+simpleDateFormat.format(new Date()));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class MyRunnable1 implements Runnable{
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"给我一个线程1:"+simpleDateFormat.format(new Date()));
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

  • Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

  说明:创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,也就相当于单线程串行执行任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。使用 LinkedBlockingQueue 作为等待队列。

  示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Chi {
    public static void main(String[] args) {
        /*没有线程池的写法
        Runnable r = new MyRunnable();
        Thread t = new Thread(r);
        t.start();*/

ExecutorService e =Executors.newSingleThreadExecutor();//创建一个单线程的线程池
        e.submit(new MyRunnable());
        e.submit(new MyRunnable());
        e.submit(new MyRunnable());
        e.submit(new MyRunnable());
        e.submit(new MyRunnable());
        e.submit(new MyRunnable());
        e.shutdown();
    }
    }

class MyRunnable implements Runnable{
@Override
    public void run() {
        System.out.println("给我一个线程:"+Thread.currentThread().getName());
        try {
            System.out.println("线程开始消耗资源"+Thread.currentThread().getName());
            Thread.sleep(2000);
            System.out.println("线程使用完毕"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("归还到线程池中"+Thread.currentThread().getName());
    }
    }

 

  • Executors.newSingleThreadScheduledExecutor();

  说明:创建一个单例线程池,定期或延时执行任务。

  示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 描述:创建一个单例线程池,定期或延时执行任务。
 */
public class ThreadpoolsSingleThreadScheduled {
    /**
     * 我们获取四次次线程,观察4个线程地址
     * @param args
     */
    public static  void main(String[]args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();


        for(int i=0;i<10;i++)
        {
            final int index=i;
            scheduledExecutorService.schedule(new ThreadForpools(index),3, TimeUnit.SECONDS);

        }
    }

}

 

  • Executors.newWorkStealingPool(3);   
public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
 }

  说明:创建一个带并行级别的线程池,并行级别决定了同一时刻最多有多少个线程在执行,如不穿如并行级别参数,将默认为当前系统的CPU个数。

  示例:  

public class ThreadpoolsWorkStealingPool {
  public static void main(String[] args) throws Exception {
     // 设置并行级别为2,即默认每时每刻只有2个线程同时执行
     ExecutorService executorService = Executors.newWorkStealingPool(3);
     for (int i = 1; i <= 50; i++) {
         final int count=i;
         executorService.submit(new ThreadForpools(count));
     }
     while(true){
         //主线程陷入死循环,来观察结果,否则是看不到结果的
     }
  }
}

 对比:

  1. 若自身对性能有很大需求,且对于机器性能、代码能力等有足够自信,使用ThreadPoolExecutor的构造方法是最合适的。
  2. newSingleThreadExecutor()是构造只有一个线程的线程池,保存任务的队列是无界的,可接收所有任务,但是同时只有一个线程执行任务
  3. newFixedThreadPool()是构造可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程。
  4. newScheduledThreadPool()创建一个可重用线程池(最大线程数为int最大值),它可安排在给定延迟后运行命令或者定期地执行(因为使用DelayedWorkQueue()队列)。
  5. newCachedThreadPool()是构造一个可根据需要创建新线程的线程池(最大线程数为int最大值),但是在以前构造的线程可用时将重用它们。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。

 你可以使用JDK自带的监控工具来监控我们创建的线程数量,运行一个不终止的线程,创建指定量的线程,来观察。
 工具目录:C:\Program Files\Java\jdk1.6.0_06\bin\jconsole.exe

  • ThreadPoolExecutor创建方式

  通过第一种创建线程池的方式的源码可以看出其实也是通过ThreadPoolExecutor方式来实现的,下面咱们重点讲一下ThreadPoolExecutor实现方式:

  ThreadPoolExecutor构造函数:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
   //这种情况下,一旦提交的线程数超过当前可用线程数时,就会抛出java.util.concurrent.RejectedExecutionException,这是因为当前线程池使用的队列是有边界队列,队列已经满了便无法继续处理新的请求。但是异常(Exception)总比发生错误(Error)要好
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

  主要参数解释:

corePoolSize

核心线程数,当有任务进来的时候,如果当前线程数还未达到 corePoolSize 个数,则创建核心线程,核心线程有几个特点:

1、在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程,避免第一次调用缓慢;

    2、当线程数未达到核心线程最大值的时候,新任务进来,即使有空闲线程,也不会复用,仍然新建核心线程;

   3、核心线程一般不会被销毁,即使是空闲的状态,但是如果通过方法 allowCoreThreadTimeOut(boolean value) 设置为 true 时,超时也同样会被销毁;

maximumPoolSize

  除了有核心线程外,有些策略是当核心线程完全无空闲的时候,还会创建一些临时的线程来处理任务,maximumPoolSize 就是核心线程 + 临时线程的最大上限。临时线程有一个超时机制,超过了设置的空闲时间没有事儿干,就会被销毁。

keepAliveTime

  这个就是上面两个参数里所提到的超时时间,也就是线程的最大空闲时间,默认用于非核心线程,通过 allowCoreThreadTimeOut(boolean value) 方法设置后,也会适用于核心线程,直到线程池的线程数为0。

unit

  这个参数配合上面的 keepAliveTime ,指定超时的时间单位,秒、分、时等。 

TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

workQueue

A.队列保存策略:

  若运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队;若运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程;若无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。 
B.队列选取通常策略: 
  a.直接提交:直接提交队列(如SynchronousQueue),此种队列将任务直接提交给线程而不保存他们,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。如newCachedThreadPool 
  b.无界队列。使用无界队列(如 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize(因此,maximumPoolSize 的值也就无效了)。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。如newFixedThreadPool和newSingleThreadExecutor 。(警惕任务队列无限堆积的风险)
  c.有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。 

ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue:是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。
SynchronousQueue:

threadFactory

  它是一个接口,用于实现生成线程的方式、定义线程名格式、是否后台执行等等,可以用 Executors.defaultThreadFactory() 默认的实现即可,也可以用 Guava 等三方库提供的方法实现,如果有特殊要求的话可以自己定义。它最重要的地方应该就是定义线程名称的格式,便于排查问题了吧。

handler

  当没有空闲的线程处理任务,并且等待队列已满(当然这只对有界队列有效),再有新任务进来的话,就要做一些取舍了,而这个参数就是指定取舍策略的,有下面四种策略可以选择:

ThreadPoolExecutor.AbortPolicy:直接抛出异常,这是默认策略; 
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后将新来的任务加入等待队列
ThreadPoolExecutor.CallerRunsPolicy:由线程池所在的线程处理该任务,比如在 main 函数中创建线程池,如果执行此策略,将有 main 线程来执行该任务  
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 

 线程池提交方式对比 

executor2.execute(myThreadImplements);//不关心返回结果
Future futureTask
= executor1.submit(myThreadCallable);//关心返回结果, 源码函数<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);//这种调用和第二种类似,但是它的返回值是传进来的,而第二种方式是call返回的
 Future<?> submit(Runnable task);//不关心返回结果,虽然返回Future,但是其get()方法总是返回null
 
ScheduledFuture scheduledFuture = executor3.schedule(myThreadImplements, 100L, TimeUnit.SECONDS);
  • execute方式:  

  execute属于ExecutorService的父接口Executor中的方法,所有线程池都具有此方法。

  • submit方式:

  submit属于ExecutorService的方法。具有返回值Future ,当内部线程使用实现Callable方式是现实时(具有返回值),可以接收返回值。上述所有线程池都具有此方法。但是其实ExecutorService接口中中有多个submit重载方法

  • schedule方式

  schedule属于ExecutorService的子孙类ScheduledThreadPoolExecutor方法。具有返回值Future ,当内部线程使用实现Callable方式是现实时(具有返回值),可以接收返回值。能实现定时任务和延迟任务。上述线程池中只有newScheduledThreadPool创建的线程池具有此方法。

  线程池的处理结果、以及处理过程中的异常都被包装到Future中,并在调用Future.get()方法时获取,执行过程中的异常会被包装成ExecutionExceptionsubmit()方法本身不会传递结果和任务执行过程中的异常。获取执行结果的代码可以这样写:

 

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            throw new RuntimeException("exception in call~");// 该异常会在调用Future.get()时传递给调用者
        }
    });
    
try {
  Object result = future.get();
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
} 

  获取单个结果

  过submit()向线程池提交任务后会返回一个Future,调用V Future.get()方法能够阻塞等待执行结果,V get(long timeout, TimeUnit unit)方法可以指定等待的超时时间。

  获取多个结果

  如果向线程池提交了多个任务,要获取这些任务的执行结果,可以依次调用Future.get()获得。但对于这种场景,我们更应该使用ExecutorCompletionService,该类的take()方法总是阻塞等待某一个任务完成,然后返回该任务的Future对象。向CompletionService批量提交任务后,只需调用相同次数的CompletionService.take()方法,就能获取所有任务的执行结果,获取顺序是任意的,取决于任务的完成顺序。ExecutorCompletionService提供了等待所有任务执行结束的有效方式,如果要设置等待的超时时间,则可以通过CountDownLatch完成,:

void solve(Executor executor, Collection<Callable<Result>> solvers)
   throws InterruptedException, ExecutionException {
   
   CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 构造器
   
   for (Callable<Result> s : solvers){// 提交所有任务
       ecs.submit(s);
   }    
   int n = solvers.size();
   for (int i = 0; i < n; ++i) {// 获取每一个完成的任务
       Result r = ecs.take().get();
       if (r != null)
           use(r);
   }
}

有关线程池几个重要的类

   ScheduledExecutorService:能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

  ScheduledThreadPoolExecutor: 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

  从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService;AbstractExecutorService是一个抽象类AbstractExecutorService是一个抽象类,它实现了ExecutorService接口;而ExecutorService又是继承了Executor接口。

  看一下这几个类的源码:

public abstract class AbstractExecutorService implements ExecutorService {
 
     
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

  

public interface ExecutorService extends Executor {
 
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  

public interface Executor {
    void execute(Runnable command);
}

  execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

  shutdown()和shutdownNow()是用来关闭线程池的。

  还有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

 

线程池的原理及如何配置线程池

ThreadPoolExecutor的使用

 多线程系列文章

版权声明:本文为htyj原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/htyj/p/10849020.html