Java阻塞队列
什么是阻塞队列
阻塞队列是一个支持阻塞的插入和移除的队列。
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:意思是队列为空时,获取元素(同时移除元素)的线程会被阻塞,等到队列变为非空。
阻塞队列用法
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里获取元素的线程。
当阻塞队列不可用时,会有四种相应的处理方式:
处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入操作 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除操作 | remove() | poll() | take() | poll(time, unit) |
获取操作 | element() | peek() | 不可用 | 不可用 |
- 返回特殊值:插入元素时,会返回是否插入成功,成功返回true。如果是移除方法,则是从队列中取出一个元素,没有则返回null。
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里面put元素,则生产者线程会被阻塞,知道队列不满或者响应中断退出。当队列为空时,如果消费者线程从队列里take元素。
- 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定时间,生产者线程就会退出。
如果是无界阻塞队列,队列则不会出现满的情况。
阻塞队列
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
-
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
- DelayQueue:一个使用优先级队列实现的无界阻塞队列
- SynchronousQueue:一个不存储元素的阻塞队列
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列
1.ArrayBlockingQueue
此队列按照先进先出(FIFO)的原则对元素进行排序
默认情况下不保证线程公平地访问队列(所谓公平是指当队列可用时,先被阻塞的线程先访问队列)
为了保证公平性通常会降低吞吐量。
公平锁是利用了可重入锁的公平锁来实现。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2.ArrayBlockingQueue
此队列按照先进先出(FIFO)的原则对元素进行排序
默认长度为Integer.MAX_VALUE
3.PriorityBlockingQueue
默认情况下元素采取自然顺序升序排列
可以自定义Comparator
或者自定义类实现compareTo()
方法来指定排序规则
不支持同优先级元素排序
4.DelayQueue
队列使用PriorityQueue
来实现,队列中的元素必须实现Delayed
接口
只有在延时期满才能从队列中提取元素
阻塞队列原理
如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?
使用通知模式实现。所谓通知模式,就是当生产者往满的队列添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。
以ArrayBlockingQueue
为例子
/** items 存放队列中的元素*/
final Object[] items;
/** take 操作的索引 */
int takeIndex;
/** put 操作的索引 */
int putIndex;
/** 队列中元素个数 */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** 控制生产者 takes 操作的 Condition */
private final Condition notEmpty;
/** 控制消费者 put 操作的 Condition */
private final Condition notFull;
put
操作
public void put(E e) throws InterruptedException {
checkNotNull(e); //判断 e == null
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //获取锁,与lock不同,可以尝试中断阻塞
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
入队操作,入队之后唤醒消费者线程。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
notFull.await();
中其实调用了park
方法,先使用setBlocker
保存一下将要阻塞的线程,然后调用本地方法UNSAFE.park(boolean isAbsolute, long time)
进行阻塞。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}