阻塞队列分析
前言
在分析多线程的文章中,我们知道了Executors是通过阻塞队列接受任务。例如 FixedThreadPool 使用的是 LinkedBlockingQueue, CachedThreadPool 使用的是 SynchronousQueue。阻塞队列的基类是 BlockingQueue,他的实现类如下所示
BlockingQueue的api我们需要重点关注下,理解了这些api的作用,对于实现类的分析会轻松很多。
类型 | api名称 | 是否阻塞 | 简述 |
放入数据 | offer(anObject) | 否 | 将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false |
offer(E o, long timeout, TimeUnit unit) | 否 | 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败 | |
put(anObject) | 是 | 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续 | |
获取数据 | poll(time) | 否 | 取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null |
poll(long timeout, TimeUnit unit) | 否 | 同上 | |
take() | 是 | 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入 | |
drainTo() | 否 | 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁 |
重点关注表中的阻塞类型的方法,他们是阻塞队列的核心。接下来讲述几个常用的阻塞队列的存取数据api。
ArrayBolckingQueue
在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象。我们先看offer方法
我们再看下 enqueue
以上是非阻塞的存放。我们再看阻塞版本的存放
可以看到,两种方式的差别其实很小,一个是阻塞一段时间后直接返回false,一个是无限期的阻塞。我们再看下取数据的api。
再看下dequeue
以上就是ArrayBlockingQunue的具体分析。
LinkedBlockingQueue
LinkedBlockingQueue是基于链表的阻塞队列,首先需要注意的是LinkedBlockingQueue是可以无界的,当你不指定容量时他默认的大小是 Integer.MAX_VALUE
链表通过内部的Node来实现,可以看出这是个单项链表。
接下来,我们再看下具体的api
我们看下 enqueue,就是一个简单的链表操作。
put的操作和offer相似,这里就省略了,再看下poll
看下dequeue,把头结点取出来,把下个节点设为头结点
以上就是LinkedBlockingQueue。
通过上述两种队列的讲解,我们大概知道了队列存取元素的大致过程,其他队列和上述两种队列的api大致相同,所以接下重点讲述队列的大致特点,不再对api进行详细的描述。
PriorityQueue
PriorityQueue并不是阻塞队列,在这里讲述是因为,接下来的几个队列都是基于他的扩展。PriorityQueue和ArrayBlockingQueue一样,内部维护了一个定长数组,如果不指定长度,默认长度就是11。PriorityQueue是一个优先级队列,元素的顺序并不是按照插入顺序而来,而是按照元素的大小来判断。默认的比较器是从小到大,即队首的元素总是最小的。当然。可以自定义比较器。
PriorityQueue的优先级通过二叉小顶堆实现,他的逻辑结构是一棵完全二叉树,存储结构其实是一个数组。逻辑结构层次遍历的结果刚好是一个数组。这里借鉴网上的一幅图
我们看下添加元素的过程,这里重点是siftUp方法
由上图可以看出,PriorityQunue 最关键的便是 比较-交换 步骤。 添加元素都是先放到最后,然后再与自己的父节点比较
再看下获取元素的步骤
这里还有种情况,就是方法,删除中间的某个元素,这就是上述两种变化的结合,首先删除 i 下标的元素,然后把末尾的元素放置到 i 坐标,先向下比较看看,再向上比较。具体的代码这里就省略了。
以上就是PriorityQueue的具体分析。
DelayQueue
DelayQueue 是延迟阻塞队列,队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
DelayQueue内部通过PriorityQueue实现延迟的权重排序 ,其比较器是 Comparable<Delayed>
主要看存元素的api
这里关键的是
first.getDelay(NANOSECONDS)>0
所以我们在实现 Delayed的时候 实现类里面一定得要有时间可以记录到什么过期,如果过期了一定要返回负数
DelayQueue平时比较少见,但是我们可以用它做一些很灵活的事情,例如缓存过期,空闲连接去除。按照DelayQueue的特性,队首的元素总是最先过期的,我们可以用一个后台线程监听DelayQueue的队首元素。
这里 大家可以参考下用 DelayQueue实现一个过期缓存清除的功能。
PriorityBlockingQueue
PriorityBlockingQueue是优先级阻塞队列,他和我们上文讲的PriorityQueue非常相似。但是呢他又多了点阻塞的东西,准确来说是多了半点。因为在做put操作的时候是不会有阻塞的
就算我们给他一个初始大小,但是如果容量不够还会去扩容
所以我们在往里面put的时候要注意,万一生产者一直在生产,消费者挂了,那么内存很容易就会被耗尽
SynchronousQueue
在线程池里面,大家一定见到过这个队列。这是一个不直接存放元素的队列,他存储的实际上是他内置的Node。它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品。具体的流程我举个例子
步骤一,调用put线程,存储元素
步骤二,再次调用put线程,存储元素 ,在调用一个take线程,取出一个元素
这个时候take线程会和put(“b”)配对,配对成功后put(“a”)线程就会成为头结点,整个流程如下图所示
SynchronousQueue有两种模式,公平和非公平,默认是非公平的。内部有两种数据结构,分别是是队列和栈来表示公平和非公平两种模式。上图所描述的是非公平模式,即先进后出。具体细节还是得从代码里看。
首先看下存取元素的api
两个方法都调用的 transfer 方法,这就是一个配对的方法。这里截取核心部分的transfer方法供大家参考
这里采用了大量的CAS操作进行更新,初看有点乱,但是心中熟记这是个配对方法,再debug几次。代码就会很清晰了。
参考:
https://blog.csdn.net/u013309870/article/details/71189189 【Java堆结构PriorityQueue完全解析】
http://www.cnblogs.com/leesf456/p/5560362.html 【SynchronousQueue分析】