1. BlockingQueue由来 在生产者-消费者模式中,为了使生产者消费者解藕,需要一个存放元素的容器,使生产者可以只关心往队列里添加元素下,消费者只关系从队列中取出元素进程处理。
而且这个队列必须要满足两点:
线程安全
缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者
JDK 为此设计了 阻塞队列(BlockingQueue),并提供了几个基于 BlockingQueue 接口 实现的一些线程安全的阻塞队列。
2. JDK提供的阻塞队列 JDK 定义了 BlockingQueue 接口,并在Java util.concurrent 下提供了一些实现类。
JDK 主要提供了 7 个阻塞队列,分别是:
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
阻塞队列
有界性
锁
数据结构
ArrayBlockingQueue
bounded
加锁
arraylist
LinkedBlockingQueue
optionally-bounded
加锁
linkedlist
LinkedBlockingDeque
optionally-bounded
加锁
linkedlist
LinkedTransferQueue
unbounded
无锁(CAS实现)
linkedlist
PriorityBlockingQueue
unbounded
加锁
heap
DelayQueue
unbounded
加锁
heap
3. BlockingQueue的操作方法 阻塞队列提供了四组不同的方法用于插入、移除、检查元素:
功能分类
抛出异常
返回特殊值
一直阻塞
超时退出
插入方法
add(e)
offer(e)
put(e)
offer(e,time,unit)
移除方法
remove()
poll()
take()
poll(time,unit)
检查方法
element()
peek()
-
-
抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出llegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
返回特殊值:插入方法会返回是否成功,成功则返回 true。移除方法,则是从队列里拿出一个元素,如果没有则返回 null
一直阻塞:当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到拿到数据,或者- 响应中断退出。当队列空时,消费者线程试图从队列里 take 元素,队列也会阻塞消费者线程,直到队列可用。
超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
需要特别注意的是:
不能往阻塞队列中插入null,会抛出空指针异常。
可以访问阻塞队列中的任意元素,调用remove(o)可以将队列之中的特定对象移除,但并不高效,尽量避免使用。
4. 阻塞队列实现原理 阻塞队列的原理很简单,利用了Lock锁的多条件(Condition)阻塞控制。接下来我们分析ArrayBlockingQueue JDK 1.8 的源码。
构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 public ArrayBlockingQueue(int capacity) { //..省略代码 } public ArrayBlockingQueue(int capacity, boolean fair) { //..省略代码 } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { //..省略代码 }
可以初始化队列大小, 且一旦初始化不能改变。构造方法中的fair表示控制对象的内部锁是否采用公平锁,默认是非公平锁 。Collection 可以传入最初包含的元素的集合。
首先是构造器,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器 Condition,分别是notEmpty和notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是put操作时,给他加上监视器notFull,标记这个线程是一个生产者;当线程是take操作时,给他加上监视器notEmpty,标记这个线程是消费者。
下面是初始化代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 final Object[] items;int takeIndex;int putIndex;int count;final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull; public ArrayBlockingQueue (int capacity) { this (capacity, false ); } public ArrayBlockingQueue (int capacity, boolean fair) { if (capacity <= 0 ) throw new IllegalArgumentException (); this .items = new Object [capacity]; lock = new ReentrantLock (fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
put 操作的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 1.自旋拿锁 lock.lockInterruptibly(); try { // 2.判断队列是否满了 while (count == items.length) // 2.1如果满了,阻塞该线程,并标记为notFull线程, // 等待notFull的唤醒,唤醒之后继续执行while循环。 notFull.await(); //底层是通过LockSupport.park 调用 UNSAFE.park() 实现 // 3.如果没有满,则进入队列 enqueue(e); } finally { lock.unlock(); } } //入队并通过 notEmpty.signal() 唤醒一个消费者线程 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 4 唤醒一个等待的线程 notEmpty.signal(); //底层是通过LockSupport.unpark 调用 UNSAFE.unpark(thread) 实现 }
总结put的流程:
所有执行put操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
判断阻塞队列是否满了,如果满了,则调用await方法阻塞这个线程,并标记为notFull(生产者)线程,同时释放lock锁,等待被消费者线程唤醒。
如果没有满,则调用enqueue方法将元素put进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
唤醒一个标记为notEmpty(消费者)的线程。
take操作的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //用while 而不用 if 是为了唤醒后重新判断一次,避免count状态发生变化 while (count == 0) notEmpty.await(); //底层是通过LockSupport.park 调用 UNSAFE.park() 实现 return dequeue(); } finally { lock.unlock(); } } //从队列取出一个元素并通过 notFull.signal 唤醒生产者线程 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); //底层是通过LockSupport.unpark 调用 UNSAFE.unpark(thread) 实现 return x; }
take操作和put操作的流程是类似的,总结一下take操作的流程:
所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
判断阻塞队列是否为空,如果是空,则调用await方法阻塞这个线程,并标记为notEmpty(消费者)线程,同时释放lock锁,等待被生产者线程唤醒。
如果没有空,则调用dequeue方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
唤醒一个标记为notFull(生产者)的线程。
需要注意:
put和take操作都需要先获取锁,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。
就算拿到锁了之后,也不一定会顺利进行put/take操作,需要判断队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁。
在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续往下执行,否则,自旋拿锁
await 前面用while 而不用 if 是为了唤醒后重新判断一次,避免count状态发生变化。这里是有讲究的,因为这个线程被唤醒后条件里的值很可能已经改变了,不再满足了,如果用 if,线程唤醒后会根据程序计数器的记录直接执行if后面的逻辑,而用while 可以确保再判断一次。
通过上面代码可以看到, put, take 内部是用在没有成功之前是会一直阻塞的。下面看下 offer 和 poll 对比下
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
通过以上代码可以看到 通过 offer 方法插入的话是会立马返回结果 true / false, 而通过 poll 从队列删除元素 成功会直接返回元素对象, 失败会返回 null.
5. 示例以及使用场景 5.1 生产者-消费者模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class ArrayBlockingQueueDemo { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { ArrayBlockingQueueDemo test = new ArrayBlockingQueueDemo(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { try { queue.take(); Thread.sleep(new Random().nextInt(2000)); System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { try { Thread.sleep(new Random().nextInt(1000)); queue.put(1); System.out.println("向队列取中插入一个元素,队列剩余空间:" + (queueSize - queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 向队列取中插入一个元素,队列剩余空间:10 向队列取中插入一个元素,队列剩余空间:9 向队列取中插入一个元素,队列剩余空间:8 向队列取中插入一个元素,队列剩余空间:7 向队列取中插入一个元素,队列剩余空间:6 从队列取走一个元素,队列剩余4个元素 向队列取中插入一个元素,队列剩余空间:6 向队列取中插入一个元素,队列剩余空间:5 向队列取中插入一个元素,队列剩余空间:4 从队列取走一个元素,队列剩余6个元素 向队列取中插入一个元素,队列剩余空间:4 向队列取中插入一个元素,队列剩余空间:3 从队列取走一个元素,队列剩余7个元素 向队列取中插入一个元素,队列剩余空间:3 从队列取走一个元素,队列剩余7个元素 从队列取走一个元素,队列剩余6个元素 向队列取中插入一个元素,队列剩余空间:4 向队列取中插入一个元素,队列剩余空间:3 向队列取中插入一个元素,队列剩余空间:2
5.2 线程池中使用阻塞队列