java线程池技术(一):ThreadFactory与BlockingQueue
一、ThreadFactory概述以及源码分析
ThreadFactory很简单,就是一个线程工厂也就是负责生产线程的,我们看下ThreadFactory源码;
1 public interface ThreadFactory { 2 3 /** 4 * Constructs a new {@code Thread}. Implementations may also initialize 5 * priority, name, daemon status, {@code ThreadGroup}, etc. 6 * 7 * @param r a runnable to be executed by new thread instance 8 * @return constructed thread, or {@code null} if the request to 9 * create a thread is rejected 10 */ 11 Thread newThread(Runnable r); 12 }
很简单吧,就是一个接口,newThread方法就是用来生产线程的,子类需要实现这个方法来根据自己规则生产相应的线程。
那安卓中什么地方用到了ThreadFactory呢?稍有经验的就会知道线程池中用到了,我们看下平常使用线程池是怎么创建的,以下是我一个项目中用到的:
1 // 创建线程池对象 2 public static final Executor poolExecutor = new ThreadPoolExecutor( 3 CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, 4 new LinkedBlockingQueue<Runnable>());
咦?没有用到线程池啊,别急,我们看下ThreadPoolExecutor创建的源码:
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue) { 6 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 7 Executors.defaultThreadFactory(), defaultHandler); 8 }
看到了吧,最终调用的如下构造函数来创建线程池:
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
ThreadFactory传入的参数是Executors.defaultThreadFactory(),那我们继续看下Executors中defaultThreadFactory吧:
1 /** 2 * The default thread factory 3 */ 4 static class DefaultThreadFactory implements ThreadFactory { 5 private static final AtomicInteger poolNumber = new AtomicInteger(1); 6 private final ThreadGroup group; 7 private final AtomicInteger threadNumber = new AtomicInteger(1); 8 private final String namePrefix; 9 10 DefaultThreadFactory() { 11 SecurityManager s = System.getSecurityManager(); 12 group = (s != null) ? s.getThreadGroup() : 13 Thread.currentThread().getThreadGroup(); 14 namePrefix = "pool-" + 15 poolNumber.getAndIncrement() + 16 "-thread-"; 17 } 18 19 public Thread newThread(Runnable r) { 20 Thread t = new Thread(group, r, 21 namePrefix + threadNumber.getAndIncrement(), 22 0); 23 if (t.isDaemon()) 24 t.setDaemon(false); 25 if (t.getPriority() != Thread.NORM_PRIORITY) 26 t.setPriority(Thread.NORM_PRIORITY); 27 return t; 28 } 29 }
DefaultThreadFactory实现了ThreadFactory接口,newThread中生产了一个个线程并且设置为不是守护线程,线程优先级均为Thread.NORM_PRIORITY。
在我们使用线程池的时候如果不自己创建线程工厂类,那么系统会给我们创建一个默认的线程工厂来生产线程(Executors中defaultThreadFactory())
好了,关于线程工厂就见到这里了,知道有这么个玩意用来生产线程的就可以了。
二、BlockingQueue概述以及源码分析
BlockingQueue顾名思义:阻塞队列,简单说就是放入,取出数据都会发生阻塞。比如取出数据时发现容器没有数据,就会等待产生阻塞一直等到有数据
为止,同样放入数据时如果容器已经满了,那么就回等待一直到容器有空间可以放入数据。
BlockingQueue就是一个接口,定义了插入,取出数据的接口,如下:
1 public interface BlockingQueue<E> extends Queue<E> { 2 3 //添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常 4 boolean add(E e); 5 6 //添加元素到队列里,成功返回true,失败返回false 7 boolean offer(E e); 8 9 //添加元素到队列里,如果容量满了会阻塞直到容量不满 10 void put(E e) throws InterruptedException; 11 12 13 //添加元素到队列里,如果容器已满会阻塞列队,但是不会一直阻塞,只会阻塞timeout时间,在这期间内添加成功则返回true,否则返回false 14 boolean offer(E e, long timeout, TimeUnit unit) 15 throws InterruptedException; 16 17 18 //从队列中获取元素,如果队列为空,则一直阻塞 19 E take() throws InterruptedException; 20 21 //从队列中获取元素,如果容器为空会阻塞列队,但是不会一直阻塞,只会阻塞timeout时间,在这期间内获取成功则返回对应元素,否则返回null 22 E poll(long timeout, TimeUnit unit) 23 throws InterruptedException; 24 25 //存储数据的队列剩余空间大小 26 int remainingCapacity(); 27 28 //删除指定的元素,成功返回true,失败返回false 29 boolean remove(Object o); 30 31 public boolean contains(Object o); 32 33 int drainTo(Collection<? super E> c); 34 35 int drainTo(Collection<? super E> c, int maxElements); 36 }
主要方法已经给出注释。
BlockingQueue的具体子类如下图所示:
其中最最常用的就是ArrayBlockingQueue以及LinkedBlockingQueue,我们以ArrayBlockingQueue为例详细分析一下实现过程。
三、ArrayBlockingQueue源码分析
ArrayBlockingQueue内部是以数组为容器盛放元素,并且放入和取出元素的时候使用同一个锁,也就是放入的时候不能同时取出元素。
ArrayBlockingQueue中主要属性:
1 //存储元素的数组,是个循环数组,至于为什么是循环数组下面会讲到 2 final Object[] items; 3 4 //下一次拿数据的时候的索引 5 int takeIndex; 6 7 //下一次放数据的时候的索引 8 int putIndex; 9 10 //队列中已经存储元素的个数 11 int count; 12 13 //锁,只有这一把锁 14 final ReentrantLock lock; 15 16 //等待拿数据的的条件对象 17 private final Condition notEmpty; 18 19 //等待放数据的的条件对象 20 private final Condition notFull;
已经给出详细注释就不一一详细解释了。
接下来我们看下构造函数;
1 public ArrayBlockingQueue(int capacity) { 2 this(capacity, false); 3 } 4 5 public ArrayBlockingQueue(int capacity, boolean fair) { 6 if (capacity <= 0) 7 throw new IllegalArgumentException(); 8 this.items = new Object[capacity]; 9 lock = new ReentrantLock(fair); 10 notEmpty = lock.newCondition(); 11 notFull = lock.newCondition(); 12 }
初始化的时候我们需要指定盛放元素容器的大小,并且初始化一些属性。
接下来我们分析下ArrayBlockingQueue中添加的方法:add,offer以及put方法。
先看add方法:
1 public boolean add(E e) { 2 return super.add(e); 3 }
直接调用的父类的方法,我们只好去看看父类中的add方法了:
1 public boolean add(E e) { 2 if (offer(e)) 3 return true; 4 else 5 throw new IllegalStateException("Queue full"); 6 }
是不是逻辑很简单,add方法内部调用了offer方法如果offer方法返回true则add直接返回true,否则抛出IllegalStateException异常。
接下来我们看下offer方法:
1 public boolean offer(E e) { 2 if (e == null) throw new NullPointerException(); 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 if (count == items.length) 7 return false; 8 else { 9 enqueue(e); 10 return true; 11 } 12 } finally { 13 lock.unlock(); 14 } 15 }
第2行,检查是否为null,为null则抛出空指针异常。
3,4行加锁,保证只有一个线程操作。
6,7行检查当前容器是否已将满了,如果满了则不能再放入元素,直接返回false。
9,10行如果容器没满则执行enqueue方法放入元素,然后返回true,enqueue方法后面会分析。
13行解除锁。以上就是offer方法的逻辑,比较简单,该说的都说了。
接下来分析put方法:
1 public void put(E e) throws InterruptedException { 2 if (e == null) throw new NullPointerException(); 3 final ReentrantLock lock = this.lock; 4 lock.lockInterruptibly(); 5 try { 6 while (count == items.length) 7 notFull.await(); 8 enqueue(e); 9 } finally { 10 lock.unlock(); 11 } 12 }
第4行这里调用的是lockInterruptibly方法,与lock方法相比,lockInterruptibly可以被中断,中断的时候产生InterruptedException异常。
6,7行同样判断容器是否已经满了,如果已经满了则执行wait逻辑等待,这里就是阻塞队列的核心,是不是觉得原来如此啊。
8行,如果容器有空间执行enqueue方法,向容器中加入元素。
offer与put都用到了enqueue方法向容器中加入元素,接下来我们看下enqueue方法:
1 private void enqueue(E x) { 2 // assert lock.getHoldCount() == 1; 3 // assert items[putIndex] == null; 4 final Object[] items = this.items; 5 items[putIndex] = x; 6 if (++putIndex == items.length) putIndex = 0; 7 count++; 8 notEmpty.signal(); 9 }
4,5行就是向数组items中放入元素x。
6行,放元素的索引putIndex加1后与数组长度比较,如果达到数组长度则将putIndex置为0,相当于下一次放入元素位置为数组的第一次位置,从头开始放入元素,上面说过items是个循环数组,就是在这里体现出来的,如果我们初始化的时候设定容器items大小为10,然而我们不停放入数据也是没问题的,只不过后面加入的数据会覆盖前面的数据。
8行,唤醒取数据的线程,告诉其哥们我放入了一个数据你可以取数据了。
到这放入数据的核心部分就分析完了,是不是很简单???放入数据还有个offer(E e, long timeout, TimeUnit unit)方法,同样很简单,可以自行分析。
接下来分析取出数据的方法,取出数据主要是poll与take方法。
先来看下poll方法;
1 public E poll() { 2 final ReentrantLock lock = this.lock; 3 lock.lock(); 4 try { 5 return (count == 0) ? null : dequeue(); 6 } finally { 7 lock.unlock(); 8 } 9 }
2,3行同样是先锁住,保证单线程操作。
5行,判断当前线程中元素数量是否为0,如果为0则没有元素返回null,否则执行dequeue方法取出数据并返回,后续会分析dequeue方法。
7行解除锁。
poll方法是不是很简单,同样poll(long timeout, TimeUnit unit)也不难分析可自行分析。
接下来看下take方法:
1 public E take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lockInterruptibly(); 4 try { 5 while (count == 0) 6 notEmpty.await(); 7 return dequeue(); 8 } finally { 9 lock.unlock(); 10 } 11 }
take方法也不难理解,核心就是5,6行逻辑,如果count为0也就是容器内没有数据则执行wait方法,线程一直处于等待状态,如果不为0则执行dequeue
方法取出数据。
我们再看下dequeue方法:
1 private E dequeue() { 2 // assert lock.getHoldCount() == 1; 3 // assert items[takeIndex] != null; 4 final Object[] items = this.items; 5 @SuppressWarnings("unchecked") 6 E x = (E) items[takeIndex]; 7 items[takeIndex] = null; 8 if (++takeIndex == items.length) takeIndex = 0; 9 count--; 10 if (itrs != null) 11 itrs.elementDequeued(); 12 notFull.signal(); 13 return x; 14 }
6,7,13行从数组items中取出数据,并将原数组位置处置为null,最后13行处返回取出的数据。
8行,取数据索引takeIndex加1后与数组总长度比较如果达到数组长度则将takeIndex置为0,下一次从数组开始处取数据。
10,11行通知迭代器有数据取出。
12行通知放入数据的线程有数据取出了,你可以放入数据了。
好了,以上就是ArrayBlockingQueue中放入取出数据的操作源码分析,多线程中总会提到生产者消费者模式,其实用ArrayBlockingQueue实现是很简单的,ArrayBlockingQueue只是将wait,notify操作进行了封装而已。
LinkedBlockingQueue源码就不一一分析了,主要区别的LinkedBlockingQueue内部是用链表来存储数据的,并且有两把锁,放数据锁与取数据锁,也就是放入数据的线程和取出数据的线程可以同时操作LinkedBlockingQueue,而ArrayBlockingQueue中放数据线程与取数据线程是互斥的,不能同时操作,LinkedBlockingQueue初始化的时候可以不指定容器大小,如果不指定则容器大小为Integer.MAX_VALUE,而ArrayBlockingQueue则必须指定容器大小。
好了以上就是本篇全部内容了,如果对你有用可自行打赏