• 数组大小
  1. final Object[] items;
  • 下一个进队元素的下标
  1. /** items index for next take, poll, peek or remove */
  2. int takeIndex;
  • 下一个出队元素的下标
  1. /** items index for next put, offer, or add */
  2. int putIndex;
  • 队列中元素的数目
  1. /** Number of elements in the queue */
  2. int count;
  • 出队和入队需要的锁
  1. /*
  2. * Concurrency control uses the classic two-condition algorithm
  3. * found in any textbook.
  4. */
  5. /** Main lock guarding all access */
  6. final ReentrantLock lock;
  • 出队条件
  1. /** Condition for waiting takes */
  2. private final Condition notEmpty;
  • 入队条件
  1. /** Condition for waiting puts */
  2. private final Condition notFull;
  • 配置容量,先创建是否公平公平访问
  1. public ArrayBlockingQueue(int capacity, boolean fair) {
  2. if (capacity <= 0)
  3. throw new IllegalArgumentException();
  4. this.items = new Object[capacity];
  5. lock = new ReentrantLock(fair);
  6. notEmpty = lock.newCondition();
  7. notFull = lock.newCondition();
  8. }
  • 从源码可以看到,创建一个object数组,然后创建一个公平或非公平锁,然后创建出队条件和入队条件
  1. public boolean offer(E e) {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. if (count == items.length)
  7. return false;
  8. else {
  9. enqueue(e);
  10. return true;
  11. }
  12. } finally {
  13. lock.unlock();
  14. }
  15. }
  • 首先检查是否为null
  • 然后lock锁住
  • 如果当前数目count已经为初始的时候容量,这时候自己返回false
  • 否则的话执行enqueue方法
  1. private void enqueue(E x) {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[putIndex] == null;
  4. final Object[] items = this.items;
  5. items[putIndex] = x;
  6. if (++putIndex == items.length)
  7. putIndex = 0;
  8. count++;
  9. notEmpty.signal();
  10. }
  • 将新的元素添加到数组的下一个进队的位置
  • 然后notEmpty出队条件唤醒,这个时候可以进行出队
  • 执行enqueue后然后释放锁
  1. public boolean offer(E e, long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. checkNotNull(e);
  4. long nanos = unit.toNanos(timeout);
  5. final ReentrantLock lock = this.lock;
  6. lock.lockInterruptibly();
  7. try {
  8. while (count == items.length) {
  9. if (nanos <= 0)
  10. return false;
  11. nanos = notFull.awaitNanos(nanos);
  12. }
  13. enqueue(e);
  14. return true;
  15. } finally {
  16. lock.unlock();
  17. }
  18. }
  • 方法大致和前面的一致,不同的时候是当队列满的时候,会等待一段时间,此时入队条件等待一段时间,一段时间后继续进入循环进行判断队列还满
  • 当队列不满的时候执行enqueue
  • 调用父类的add方法

    1. public boolean add(E e) {
    2. return super.add(e);
    3. }
  • 父类AbstractQueue的add方法

  1. public boolean add(E e) {
  2. if (offer(e))
  3. return true;
  4. else
  5. throw new IllegalStateException("Queue full");
  6. }
  • 执行offer方法,这个时候可以对比上面直接调用offer,offer方法如果入队失败会直接返回false,而add方法会抛出异常
  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. while (count == items.length)
  7. notFull.await();
  8. enqueue(e);
  9. } finally {
  10. lock.unlock();
  11. }
  12. }
  • 和限定时间的offer方法不同,当队列满的时候,会一直等待,直到有人唤醒
  1. public E poll() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return (count == 0) ? null : dequeue();
  6. } finally {
  7. lock.unlock();
  8. }
  9. }
  • 首先执行lock方法锁定
  • 如果当前队中无元素,那么返回null,否则执行dequeue方法
  1. private E dequeue() {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[takeIndex] != null;
  4. final Object[] items = this.items;
  5. @SuppressWarnings("unchecked")
  6. E x = (E) items[takeIndex];
  7. items[takeIndex] = null;
  8. if (++takeIndex == items.length)
  9. takeIndex = 0;
  10. count--;
  11. if (itrs != null)
  12. itrs.elementDequeued();
  13. notFull.signal();
  14. return x;
  15. }
  • 根据出队下标取出元素,然后将该位置置为null
  • 将出队下标加一,如果出队下标等于了数组的大小,出队下标置为0
  • 队中元素数量减一
  • notFull唤醒,此时可以唤醒入队阻塞的线程
  1. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  2. long nanos = unit.toNanos(timeout);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. while (count == 0) {
  7. if (nanos <= 0)
  8. return null;
  9. nanos = notEmpty.awaitNanos(nanos);
  10. }
  11. return dequeue();
  12. } finally {
  13. lock.unlock();
  14. }
  15. }
  • 与offer的指定时间和没有指定时间类似,poll指定时间的方法和没有指定时间的poll思路大致是一样的
  • 当此时队列为空的,为等待一段时间,然后自动唤醒,继续进入循环,直到队列中有元素,然后执行dequeue方法
  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. notEmpty.await();
  7. return dequeue();
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  • 和前面指定时间的poll方法不同,当队中为空的时候,会一直等待,直到有人唤醒
  • 入队
方法 特点
offer(E e) 队列满的时候,返回false
offer(E e, long timeout, TimeUnit unit) 队列满的时候,等待一段时间,释放锁,一段时间后,进入就绪状态
add(E e) 队列满的时候,直接抛出异常
put(E e) 队列慢的时候,线程阻塞,直到有人唤醒
  • 出队
方法 特点
poll() 队列为空的时候,直接返回null
poll(long timeout, TimeUnit unit) 队列为空的时候,等待一段时间,释放锁,一段时候后,进入就绪状态
take() 队列为空的时候,一直等待,释放锁,直到被唤醒
  • 总体的设计思路,通过一个数组来模拟一个数组,出队和入队都是同步的,也就是同一时间只能有一个入队或者出队操作,然后在入队的时候,如果队列已满的话,根据方法的不同有不同的策略,可以直接返回或者抛出异常,也可以阻塞一段时间,等会在尝试入队,或者直接阻塞,直到有人唤醒。而出队的时候,如果为空可以直接返回,也可以等待一段时间然后再次尝试,也可以阻塞,直到有人唤醒

作者:jiajun 出处: http://www.cnblogs.com/-new/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。如果觉得还有帮助的话,可以点一下右下角的【推荐】,希望能够持续的为大家带来好的技术文章!想跟我一起进步么?那就【关注】我吧。

版权声明:本文为-new原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:http://www.cnblogs.com/-new/p/7783051.html