Java BlockingQueue 深度教程
- 0. 前言:为什么需要 BlockingQueue?
- 1. 什么是 BlockingQueue?核心特性概览
- 2. BlockingQueue 接口核心方法详解
- 3. 常见的 BlockingQueue 实现类深度剖析
- 3.1 ArrayBlockingQueue:有界数组,单锁实现
- 3.2 LinkedBlockingQueue:(可选)有界链表,分离锁实现
- 3.3 PriorityBlockingQueue:支持优先级的无界队列
- 3.4 DelayQueue:延迟执行的无界队列
- 3.5 SynchronousQueue:不存储元素的“直接传递”队列
- 3.6 LinkedTransferQueue:更强大的无界链表队列
- 3.7 LinkedBlockingDeque:双端阻塞队列
- 4. BlockingQueue 实现原理深入:锁与条件变量
- 5. 如何选择合适的 BlockingQueue?
- 6. BlockingQueue 实战应用示例
- 6.1 示例一:实现请求批处理组件
- 6.2 示例二:经典的生产者-消费者模式
- 6.3 对比:使用非阻塞队列实现生产者-消费者
- 7. 高级话题与注意事项
- 7.1 公平性 (Fairness)
- 7.2 内存占用与 OOM 风险
- 7.3 中断处理 (`InterruptedException`)
- 7.4 `BlockingQueue` 与 `ThreadPoolExecutor`
- 8. 总结
0. 前言:为什么需要 BlockingQueue?
想象一下现实生活中的流水线:上游工序不断生产零件(生产者),下游工序不断取用零件进行组装(消费者)。为了保证流水线顺畅运行,通常会在上下游之间设置一个传送带或缓冲区。这个缓冲区有几个关键作用:
- 解耦:生产者和消费者的速度不需要完全匹配。生产者快了,零件可以在缓冲区暂存;消费者快了,可以等待缓冲区有新的零件。
- 缓冲:可以应对短时间内的生产或消费速率波动,提高系统整体的稳定性和效率。
- 协作:提供了一个明确的交接点,让生产者和消费者有序地传递工作成果。
在多线程编程的世界里,我们也面临类似的问题:不同的线程可能扮演着生产者和消费者的角色,它们需要一种安全、高效的方式来传递数据或任务。如果直接让它们共享一个普通的集合(如 ArrayList
或 LinkedList
),就会遇到很多麻烦:
- 线程安全问题:多个线程同时读写同一个集合,需要开发者手动添加复杂的锁机制来保证数据一致性,否则可能导致数据错乱、丢失甚至程序崩溃。
- 忙等待 (Busy Waiting):当缓冲区满时,生产者怎么办?当缓冲区空时,消费者怎么办?一种简单粗暴的方式是让线程不断地循环检查状态(“队列满了吗?”、“队列空了吗?”),这会极大地浪费 CPU 资源。
- 复杂的线程通信:需要开发者手动使用
wait()
,notify()
,notifyAll()
或更现代的Lock
/Condition
来协调线程的等待和唤醒,这非常容易出错,难以调试。
为了解决这些痛点,Java 并发包 (java.util.concurrent
) 提供了 BlockingQueue
(阻塞队列)接口及其多种实现。
1. 什么是 BlockingQueue?核心特性概览
BlockingQueue
是 java.util.concurrent
包下的一个接口,它继承自 java.util.Queue
接口。顾名思义,它首先是一个队列,遵循先进先出(FIFO)的基本原则(但也有例外,如 PriorityBlockingQueue
)。
它的核心特性在于 “阻塞 (Blocking)”:
- 入队阻塞:当队列已满时,尝试向队列中添加元素的线程会被阻塞(挂起/暂停执行),直到队列中有空间可用。
- 出队阻塞:当队列为空时,尝试从队列中获取元素的线程会被阻塞,直到队列中有元素可用。
除了阻塞特性,BlockingQueue
还具备以下关键特点:
- 线程安全 (Thread-Safety):
BlockingQueue
的所有实现都是线程安全的。多个线程可以同时安全地对其进行并发操作(入队、出队),而无需开发者进行额外的同步控制。其内部已经封装了必要的锁和同步机制。 - 多种操作策略:对于队列满或空的情况,
BlockingQueue
提供了不止阻塞一种处理方式。它定义了四组不同的方法,分别对应不同的处理策略:抛出异常、返回特殊值(null
或false
)、阻塞等待、超时等待。这提供了极大的灵活性。 - 容量限制 (Bounded/Unbounded):
BlockingQueue
的实现可以是有界 (Bounded) 的(即队列容量固定),也可以是无界 (Unbounded) 的(理论上容量无限,受限于系统内存)。
BlockingQueue
的主要作用:
- 生产者-消费者模式实现:这是
BlockingQueue
最经典、最核心的应用场景。它极大地简化了生产者-消费者模式的实现,让开发者无需关心底层的线程同步和通信细节。 - 线程池的任务队列:Java 的
ThreadPoolExecutor
就使用BlockingQueue
来存储待执行的任务。任务提交者是生产者,工作线程是消费者。 - 数据缓冲与流量控制:在异步处理、消息传递等场景中,用作数据缓冲区,可以平滑处理速率差异,防止生产者速度过快压垮消费者或导致系统资源耗尽(背压机制)。
- 解耦与异步化:将任务的提交与执行分离,实现系统模块间的解耦,提高系统的响应性和可伸缩性。
个人理解:
BlockingQueue
的核心价值在于它将并发编程中极其复杂且容易出错的状态依赖管理和线程间通信问题,封装在了简洁易用的队列接口之下。开发者不再需要手动编写synchronized
代码块、wait/notify
逻辑或复杂的Lock/Condition
协调代码。你只需要选择一个合适的BlockingQueue
实现,然后调用put()
或take()
这样语义清晰的方法,并发控制的魔法就在底层自动发生了。这使得我们能更专注于业务逻辑本身,显著提高了开发效率和代码质量。
它不仅仅是一个数据结构,更是一种强大的并发设计模式的体现。
2. BlockingQueue 接口核心方法详解
BlockingQueue
接口继承了 Queue
接口,并额外定义了阻塞方法。为了满足不同的并发场景需求,它提供了四组处理队列满/空情况的核心方法。理解这四组方法的区别至关重要。
操作类型 | 行为描述 | 入队方法 (Queue Full) | 出队方法 (Queue Empty) | 查看队首 (Queue Empty) | 继承来源 |
---|---|---|---|---|---|
1. 抛出异常 | 操作无法立即满足时,抛出未检查异常 | add(E e) -> IllegalStateException | remove() -> NoSuchElementException | element() -> NoSuchElementException | Collection |
2. 返回特殊值 | 操作无法立即满足时,返回特定值(false /null ) | offer(E e) -> false | poll() -> null | peek() -> null | Queue |
3. 阻塞 | 操作无法立即满足时,线程阻塞等待直到满足 | put(E e) -> (阻塞) | take() -> (阻塞) | (不支持) | BlockingQueue |
4. 超时阻塞 | 在指定时间内阻塞等待,超时后放弃 | offer(E e, long t, TimeUnit u) -> false | poll(long t, TimeUnit u) -> null | (不支持) | BlockingQueue |
方法详解与选择指南:
-
抛出异常组 (
add
,remove
,element
)- 特点:行为最激进,直接、快速地失败。源自
Collection
接口。 - 适用场景:当你认为队列满/空是一种程序错误或异常状态时使用。例如,在一个容量有限且期望永远不应该满的队列中,如果
add()
抛出异常,可能表示系统设计或负载预估有问题,需要立即处理。不适合常规的生产者-消费者流控制。 - 注意:
remove()
和element()
在队列为空时抛出NoSuchElementException
。
- 特点:行为最激进,直接、快速地失败。源自
-
返回特殊值组 (
offer
,poll
,peek
)- 特点:行为最温和,操作失败不影响程序流程(不抛异常),而是返回
false
或null
。调用者需要检查返回值来判断操作是否成功。源自Queue
接口。 - 适用场景:当你需要非阻塞地尝试操作,并且队列满/空是预期内可能发生的情况时。例如,你可以尝试向队列
offer
一个元素,如果失败(返回false
),则可以执行其他逻辑(如记录日志、丢弃数据、稍后重试等)。poll()
常用于消费者尝试获取数据,如果没有数据则可以去做其他事情。peek()
用于查看队首元素而不移除,队列为空返回null
。
- 特点:行为最温和,操作失败不影响程序流程(不抛异常),而是返回
-
阻塞组 (
put
,take
)- 特点:
BlockingQueue
的精髓所在。当操作条件不满足时(put
时队列满,take
时队列空),调用线程会自动进入阻塞状态,让出 CPU,直到其他线程执行了相应的操作(take
使队列不满,put
使队列不空)后,系统会自动唤醒等待的线程继续执行。 - 适用场景:经典的生产者-消费者模式。生产者使用
put()
添加数据,如果队列满了,自然就慢下来等待消费者;消费者使用take()
获取数据,如果队列空了,自然就停下来等待生产者。这是实现流量控制和线程协调最自然的方式。 - 注意:这两个方法会响应线程中断 (
InterruptedException
)。如果在等待过程中线程被中断,它们会抛出InterruptedException
。
- 特点:
-
超时阻塞组 (
offer(e, time, unit)
,poll(time, unit)
)- 特点:阻塞组的变体,增加了超时限制。线程会阻塞等待,但如果在指定的时间内条件仍未满足,则会自动放弃等待并返回一个表示失败的值(
offer
返回false
,poll
返回null
)。 - 适用场景:当你希望线程等待一段时间,但又不希望无限期等待下去时。这在需要保证系统响应性、防止线程永久阻塞的场景中非常有用。例如,一个任务处理器尝试从队列获取任务,可以
poll
等待 1 秒,如果 1 秒内没有任务,它可以去做一些周期性的维护工作,而不是一直傻等。 - 注意:同样会响应线程中断。
- 特点:阻塞组的变体,增加了超时限制。线程会阻塞等待,但如果在指定的时间内条件仍未满足,则会自动放弃等待并返回一个表示失败的值(
选择哪组方法?
- 追求简单可靠的生产者-消费者协作:优先使用
put()
和take()
。 - 需要非阻塞地尝试操作或自定义失败逻辑:使用
offer()
和poll()
。 - 需要避免无限等待,增加系统健壮性:使用带超时的
offer()
和poll()
。 - 希望在异常条件下快速失败:谨慎使用
add()
和remove()
。
核心要点: BlockingQueue
通过这四组方法,将复杂的线程同步和状态管理逻辑封装起来,提供了不同粒度的控制策略,让开发者可以根据具体需求灵活选择。
3. 常见的 BlockingQueue 实现类深度剖析
Java 并发包提供了多种 BlockingQueue
的实现,它们在内部数据结构、容量限制、锁机制、性能特性等方面各有不同。了解它们的差异和适用场景是高效使用 BlockingQueue
的关键。
3.1 ArrayBlockingQueue:有界数组,单锁实现
ArrayBlockingQueue
是一个基于定长数组实现的有界阻塞队列。
核心特点:
- 有界 (Bounded):容量在创建时指定,之后不可改变。
- 底层结构:内部使用一个
Object[]
数组来存储元素,并通过两个索引putIndex
和takeIndex
来管理队列的头部和尾部,实现循环队列的效果。 - 单锁机制 (Single Lock):使用一个
ReentrantLock
实例来控制对整个队列的并发访问。无论是入队还是出队操作,都需要获取这把唯一的锁。这意味着同一时刻,入队和出队操作是互斥的,不能并发执行。 - 公平性 (Fairness):可以在构造函数中选择是创建公平锁还是非公平锁(默认非公平)。公平锁模式下,等待时间最长的线程会优先获得锁,可以防止线程饥饿,但通常会牺牲一些吞吐量。
- 内存预分配:创建时即分配数组所需的全部内存空间。
适用场景:
- 队列容量固定且已知。
- 需要严格控制资源使用,防止队列无限增长。
- 对内存占用比较敏感,希望内存使用量稳定。
- 需要支持公平性策略。
- 生产者和消费者速率相对平衡,或者并发度不高的场景(因为单锁可能成为瓶颈)。
为什么使用单锁?
- 实现简单:相对于分离锁,单锁的逻辑更简单,不易出错。
- 数组结构特性:数组的入队和出队操作通常都需要更新共享的计数器 (
count
) 以及相应的索引 (putIndex
,takeIndex
),使用单锁更容易保证这些状态的一致性。
源码浅析 (基于 JDK 8 / OpenJDK 简化示意):
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 存储元素的数组final Object[] items;// 下一个 take, poll, peek 或 remove 的索引int takeIndex;// 下一个 put, offer, or add 的索引int putIndex;// 队列中的元素数量int count;// --- 核心锁和条件变量 ---/** 控制所有访问的主锁 */final ReentrantLock lock;/** 等待 take 的条件 (队列非空) */private final Condition notEmpty;/** 等待 put 的条件 (队列非满) */private final Condition notFull;// 构造函数 (指定容量和公平性)public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair); // 创建锁,可指定公平性notEmpty = lock.newCondition(); // 绑定到 locknotFull = lock.newCondition(); // 绑定到 lock}// 入队核心逻辑 (被 put, offer 调用)private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x; // 放置元素if (++putIndex == items.length) putIndex = 0; // 循环移动 putIndexcount++; // 增加计数notEmpty.signal(); // 唤醒可能在等待 take 的线程 (因为队列现在不空了)}// 出队核心逻辑 (被 take, poll 调用)private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex]; // 获取元素items[takeIndex] = null; // 帮助 GCif (++takeIndex == items.length) takeIndex = 0; // 循环移动 takeIndexcount--; // 减少计数// (这里还可能需要处理迭代器相关逻辑,简化省略)notFull.signal(); // 唤醒可能在等待 put 的线程 (因为队列现在不满了)return x;}// put 方法 (阻塞式入队)public void put(E e) throws InterruptedException {checkNotNull(e); // 不允许 null 元素final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 获取锁,允许中断try {// 使用 while 循环防止虚假唤醒while (count == items.length) { // 如果队列满了notFull.await(); // 在 notFull 条件上阻塞等待}enqueue(e); // 队列未满,执行入队} finally {lock.unlock(); // 必须释放锁}}// take 方法 (阻塞式出队)public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 获取锁,允许中断try {// 使用 while 循环防止虚假唤醒while (count == 0) { // 如果队列空了notEmpty.await(); // 在 notEmpty 条件上阻塞等待}return dequeue(); // 队列非空,执行出队} finally {lock.unlock(); // 必须释放锁}}// offer 方法 (非阻塞入队)public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock(); // 获取锁 (这里用 lock() 而非 lockInterruptibly() 是 Queue 接口规范)try {if (count == items.length) // 队列满return false; // 直接返回 falseelse {enqueue(e); // 入队return true;}} finally {lock.unlock();}}// poll 方法 (非阻塞出队)public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue(); // 队列空返回 null,否则出队} finally {lock.unlock();}}// offer 带超时的方法public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout); // 转换超时时间为纳秒final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 获取锁,允许中断try {while (count == items.length) { // 队列满if (nanos <= 0) // 如果超时时间已过return false; // 返回失败nanos = notFull.awaitNanos(nanos); // 在 notFull 上等待指定时间}enqueue(e); // 入队return true;} finally {lock.unlock();}}// poll 带超时的方法public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) { // 队列空if (nanos <= 0) // 超时return null; // 返回 nullnanos = notEmpty.awaitNanos(nanos); // 在 notEmpty 上等待指定时间}return dequeue(); // 出队} finally {lock.unlock();}}// ... 其他方法如 size(), contains() 等也需要获取锁 ...private static void checkNotNull(Object v) {if (v == null)throw new NullPointerException();}
}
源码解读要点:
- 单一锁
lock
:所有修改队列状态的操作(enqueue
,dequeue
)以及阻塞操作(put
,take
, 带超时的offer
/poll
)都需要先获取lock
。 - 两个条件变量
notEmpty
,notFull
:它们都绑定在同一个lock
上。notEmpty
用于在队列为空时阻塞take
操作,notFull
用于在队列满时阻塞put
操作。 await()
与signal()
:当put
时发现队列满,线程调用notFull.await()
释放lock
并进入等待状态。当take
成功执行dequeue()
后,会调用notFull.signal()
来唤醒一个(可能存在的)因队列满而等待的put
线程。take
操作的等待和put
操作的唤醒逻辑类似,使用notEmpty
条件变量。while
循环检查条件:在调用await()
之前,必须使用while (condition)
而不是if (condition)
来检查条件。这是为了防止虚假唤醒 (Spurious Wakeup)。线程可能在没有被signal()
的情况下被唤醒,使用while
循环可以确保线程被唤醒后再次检查条件是否真正满足。lockInterruptibly()
:阻塞方法put
,take
以及带超时的offer/poll
使用lockInterruptibly()
获取锁。这意味着在等待锁或在await()
期间,如果线程被中断,会抛出InterruptedException
,允许调用者响应中断。而offer()
和poll()
(无超时) 使用lock()
,这是为了遵循Queue
接口的规范,它们不应该抛出InterruptedException
。
难点理解:
为什么入队和出队不能并发?
因为它们共享同一把锁lock
。一个线程执行put
时持有lock
,另一个线程尝试执行take
时必须等待lock
被释放。反之亦然。这就像一个只有一个窗口的银行柜台,一次只能服务一个客户,无论是存款(入队)还是取款(出队)。
3.2 LinkedBlockingQueue:(可选)有界链表,分离锁实现
LinkedBlockingQueue
是一个基于链表实现的可选有界阻塞队列。
核心特点:
- 可选有界 (Optionally Bounded):可以在构造时指定容量。如果不指定容量,默认容量为
Integer.MAX_VALUE
,这通常被认为是无界队列(但实际上受限于系统内存)。 - 底层结构:内部使用单向链表结构 (
Node
节点) 存储元素。 - 分离锁/双锁机制 (Two Locks):这是
LinkedBlockingQueue
与ArrayBlockingQueue
最核心的区别。它内部使用两个ReentrantLock
:putLock
:控制入队操作(添加到链表尾部)。takeLock
:控制出队操作(从链表头部移除)。- 同时,使用一个
AtomicInteger
类型的count
来原子地维护队列中的元素数量,协调两把锁的操作。
- 高吞吐潜力:由于入队和出队使用不同的锁,只要队列既不空也不满,入队和出队操作可以并发执行,这通常能带来比
ArrayBlockingQueue
更高的吞吐量,尤其是在多核 CPU 和高并发场景下。 - 不支持公平性:
LinkedBlockingQueue
的锁是非公平的。 - 内存动态分配:链表节点按需创建,内存占用随元素数量动态变化。
适用场景:
- 队列容量不确定或需要非常大的容量(接近无界)。
- 追求高并发吞吐量,特别是生产者和消费者速率可能不匹配或波动较大的场景。
- 对公平性没有要求。
- 对锁竞争敏感的系统。
与 ArrayBlockingQueue 的主要区别:
特性 | ArrayBlockingQueue | LinkedBlockingQueue |
---|---|---|
底层结构 | 定长数组 | 链表 |
容量 | 有界 (必须指定) | 可选有界 (默认 Integer.MAX_VALUE ) |
锁机制 | 单锁 (ReentrantLock ) | 双锁 (putLock , takeLock ) |
并发性 | 入队/出队互斥 | 入队/出队可并发 |
公平性 | 可选 (构造时指定) | 不支持 (非公平) |
内存分配 | 创建时预分配 | 按需动态分配 (节点) |
吞吐量 | 通常较低(单锁瓶颈) | 通常较高(锁分离) |
GC 压力 | 较低 | 可能较高 (节点对象创建/销毁) |
源码浅析 (基于 JDK 8 / OpenJDK 简化示意):
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 链表节点定义static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}// 容量 (final, 创建时确定)private final int capacity;// 当前元素数量 (原子类型,用于协调两把锁)private final AtomicInteger count = new AtomicInteger();// 链表头节点 (head.item 永远为 null, head.next 是第一个实际元素)transient Node<E> head;// 链表尾节点 (last.next 永远为 null)private transient Node<E> last;// --- 核心锁和条件变量 ---/** 控制 take, poll 等操作的锁 */private final ReentrantLock takeLock = new ReentrantLock();/** takeLock 对应的条件 (等待队列非空) */private final Condition notEmpty = takeLock.newCondition();/** 控制 put, offer 等操作的锁 */private final ReentrantLock putLock = new ReentrantLock();/** putLock 对应的条件 (等待队列非满) */private final Condition notFull = putLock.newCondition();// 构造函数 (可以指定容量)public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null); // 初始化空链表}// 默认构造函数 (容量为 Integer.MAX_VALUE)public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}// 入队核心逻辑 (需要持有 putLock)private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node; // 将新节点添加到尾部}// 出队核心逻辑 (需要持有 takeLock)private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next; // 获取第一个实际节点h.next = h; // 帮助 GC (将旧头节点指向自己,断开与链表的连接)head = first; // 将第一个实际节点设为新的头节点E x = first.item; // 获取元素值first.item = null; // 帮助 GCreturn x;}// 发信号通知等待的生产者/消费者 (封装唤醒逻辑)// 注意:signalNotEmpty() 在 put 操作后调用,signalNotFull() 在 take 操作后调用// 它们获取的是**对方**的锁来发信号void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock(); // 获取 takeLocktry {notEmpty.signal(); // 唤醒一个等待 take 的线程} finally {takeLock.unlock();}}void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock(); // 获取 putLocktry {notFull.signal(); // 唤醒一个等待 put 的线程} finally {putLock.unlock();}}// put 方法 (阻塞式入队)public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1; // 用于记录 put 前的计数值Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly(); // 获取 putLocktry {// 等待队列非满 (注意: count 是原子变量,读取是原子的,但检查和等待不是)while (count.get() == capacity) {notFull.await(); // 在 notFull 条件上等待 (释放 putLock)}enqueue(node); // 执行入队 (持有 putLock)c = count.getAndIncrement(); // 原子地增加计数,并获取增加前的值// 如果增加后的计数仍然小于容量,说明队列还没满,// 可能还有其他 put 线程可以被唤醒 (优化点)if (c + 1 < capacity)notFull.signal(); // 唤醒下一个等待 put 的线程} finally {putLock.unlock(); // 释放 putLock}// 如果 c == 0,说明在 put 之前队列是空的。// 这次 put 使得队列从空变为非空,需要唤醒等待 take 的线程。// 注意:唤醒操作在释放 putLock 之后进行,减少锁持有时间。if (c == 0)signalNotEmpty();}// take 方法 (阻塞式出队)public E take() throws InterruptedException {E x;int c = -1; // 用于记录 take 前的计数值final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly(); // 获取 takeLocktry {// 等待队列非空while (count.get() == 0) {notEmpty.await(); // 在 notEmpty 条件上等待 (释放 takeLock)}x = dequeue(); // 执行出队 (持有 takeLock)c = count.getAndDecrement(); // 原子地减少计数,并获取减少前的值// 如果减少前的计数大于 1,说明 take 之后队列还不空,// 可能还有其他 take 线程可以被唤醒 (优化点)if (c > 1)notEmpty.signal(); // 唤醒下一个等待 take 的线程} finally {takeLock.unlock(); // 释放 takeLock}// 如果 c == capacity,说明在 take 之前队列是满的。// 这次 take 使得队列从满变为不满,需要唤醒等待 put 的线程。// 注意:唤醒操作在释放 takeLock 之后进行。if (c == capacity)signalNotFull();return x;}// offer(E e) (非阻塞入队)public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;// 先检查容量,如果满了直接返回 false (快速失败路径)if (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock(); // 获取 putLocktry {// 再次检查容量,防止在获取锁之前状态变化if (count.get() < capacity) {enqueue(node); // 入队c = count.getAndIncrement(); // 原子增加计数// 唤醒可能等待的 put 线程 (优化)if (c + 1 < capacity)notFull.signal();return true; // 成功} else {return false; // 获取锁后发现满了}} finally {putLock.unlock();}// 唤醒 take 线程 (如果需要)if (c == 0)signalNotEmpty();// return true; // 这里返回 true 是因为上面 try 块里成功了}// poll() (非阻塞出队)public E poll() {final AtomicInteger count = this.count;// 先检查是否为空 (快速失败路径)if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock(); // 获取 takeLocktry {// 再次检查是否为空if (count.get() > 0) {x = dequeue(); // 出队c = count.getAndDecrement(); // 原子减少计数// 唤醒可能等待的 take 线程 (优化)if (c > 1)notEmpty.signal();}// else: 获取锁后发现空了,x 保持 null} finally {takeLock.unlock();}// 唤醒 put 线程 (如果需要)if (c == capacity)signalNotFull();return x; // 返回元素或 null}// 带超时的 offer/poll 方法与 ArrayBlockingQueue 类似,// 只是它们分别使用 putLock/notFull 和 takeLock/notEmpty,// 并在成功操作后判断是否需要调用 signalNotEmpty() 或 signalNotFull()。// 源码略。// ... 其他方法如 size() 使用 count.get() 实现,peek() 需要获取 takeLock ...
}
源码解读要点:
- 分离锁
putLock
,takeLock
:入队相关操作(put
,offer
)获取putLock
,出队相关操作(take
,poll
,peek
)获取takeLock
。 - 原子计数器
count
:count
是AtomicInteger
类型,它的增减操作 (getAndIncrement
,getAndDecrement
) 是原子的,用来在无锁的情况下协调putLock
和takeLock
之间的状态(队列是否为空/满)。这是实现锁分离的关键。 - 条件变量分离:
notEmpty
条件绑定在takeLock
上,notFull
条件绑定在putLock
上。 - 精妙的唤醒逻辑 (
signalNotEmpty
,signalNotFull
):- 当一个
put
操作使得队列从空变为非空时 (即c == 0
),它需要唤醒一个可能在notEmpty
上等待的take
线程。注意,这个唤醒操作 (signalNotEmpty
) 是在释放putLock
之后进行的,并且它内部需要获取takeLock
来调用notEmpty.signal()
。 - 当一个
take
操作使得队列从满变为非满时 (即c == capacity
),它需要唤醒一个可能在notFull
上等待的put
线程。类似地,这个唤醒操作 (signalNotFull
) 在释放takeLock
之后进行,内部需要获取putLock
来调用notFull.signal()
。 - 这种在释放自己的锁之后,去获取对方的锁来发送信号的设计,是为了减少锁的持有时间,进一步提高并发性能。
- 当一个
- 队内唤醒优化:
- 在
put
操作成功后,如果发现队列在增加元素后仍然未满 (c + 1 < capacity
),会调用notFull.signal()
。这是为了唤醒其他可能也在等待入队的put
线程,形成一种级联唤醒,提高生产者的并发度。 - 在
take
操作成功后,如果发现队列在减少元素后仍然不空 (c > 1
),会调用notEmpty.signal()
,以唤醒其他等待的take
线程,提高消费者的并发度。
- 在
offer
/poll
的双重检查:非阻塞方法offer
和poll
通常会先在无锁状态下快速检查条件(count.get() == capacity
或count.get() == 0
),如果条件不满足,再尝试获取锁并进行第二次检查。这是一种优化,避免了不必要的锁获取开销。
难点理解:
双锁如何协同工作?
关键在于原子计数器count
和跨锁的signal
机制。
count
提供了一个全局一致的队列大小视图,即使两个锁可以并发操作头尾,count
的原子更新保证了对队列空/满状态的判断是可靠的(虽然检查和后续操作之间仍有时间窗口,需要锁内再次确认)。- 当
put
线程发现队列满了 (count.get() == capacity
),它会在putLock
控制下的notFull
条件上等待。当take
线程成功取走一个元素,使得队列从满变为不满 (c == capacity
),它会去获取putLock
并调用notFull.signal()
来唤醒等待的put
线程。- 反之亦然,
take
线程等待notEmpty
,由put
线程在队列从空变为非空时 (c == 0
) 去获取takeLock
并signal
。这种设计允许了头尾操作的并发,同时通过
count
和条件变量确保了在边界条件(空/满)下的正确阻塞和唤醒。
3.3 PriorityBlockingQueue:支持优先级的无界队列
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。
核心特点:
- 无界 (Unbounded):理论上容量没有限制(受限于内存),因此
put()
和offer()
方法永远不会阻塞,也永远不会返回false
(除非系统内存耗尽导致OutOfMemoryError
)。 - 优先级排序:队列中的元素必须实现
Comparable
接口,或者在构造队列时提供一个Comparator
。队列会根据元素的自然顺序或指定的比较器顺序对元素进行排序。每次take()
或poll()
操作返回的都是当前队列中优先级最高(根据排序规则是最小或最大,取决于实现,Java 标准库是最小元素优先)的元素。 - 底层结构:内部通常使用二叉堆 (Binary Heap)(具体是最小堆 Min-Heap)来实现优先级排序。数据存储在一个可动态扩容的数组中。
- 单锁机制:与
ArrayBlockingQueue
类似,PriorityBlockingQueue
也使用一个ReentrantLock
来控制所有访问。这是因为堆操作(插入siftUp
、删除siftDown
)通常需要修改堆的多个位置,保证整个堆结构的正确性,使用单锁更容易实现。 - 出队阻塞:只有当队列为空时,
take()
操作才会阻塞。 - 不允许
null
元素。 - 不保证相同优先级元素的顺序。
适用场景:
- 需要处理带有优先级的任务或数据,例如任务调度系统(高优先级任务先执行)、事件处理(紧急事件优先处理)。
- 生产者速率远大于消费者速率,且不希望生产者被阻塞(但要注意内存消耗)。
源码浅析 (基于 JDK 8 / OpenJDK 简化示意):
public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 存储元素的数组 (会动态扩容)private transient Object[] queue;// 队列中的元素数量private transient int size;// 用于比较元素的比较器,如果为 null,则元素必须实现 Comparableprivate transient Comparator<? super E> comparator;// --- 核心锁和条件变量 ---/** 控制所有访问的主锁 */private final ReentrantLock lock = new ReentrantLock();/** 等待队列非空的条件 (只有 take 会等待) */private final Condition notEmpty = lock.newCondition();// (没有 notFull 条件,因为队列无界)// 构造函数public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) {// ... 初始化 queue, size, comparator ...this.comparator = comparator;this.queue = new Object[initialCapacity];}// 其他构造函数...// put 方法 (非阻塞,因为无界)public void put(E e) {offer(e); // 直接调用 offer}// offer 方法 (核心入队逻辑)public boolean offer(E e) {if (e == null) throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock(); // 获取锁int n, cap;Object[] array;// 检查是否需要扩容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap); // 扩容数组try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array); // 使用 Comparable 进行堆上滤elsesiftUpUsingComparator(n, e, array, cmp); // 使用 Comparator 进行堆上滤size = n + 1; // 增加大小notEmpty.signal(); // 唤醒可能等待 take 的线程} finally {lock.unlock(); // 释放锁}return true; // 永远返回 true (除非 OOM)}// take 方法 (阻塞式出队)public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 获取锁E result;try {// 等待队列非空while (size == 0) {notEmpty.await(); // 在 notEmpty 上等待}result = dequeue(); // 执行出队} finally {lock.unlock(); // 释放锁}return result;}// poll 方法 (非阻塞出队)public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return dequeue(); // 直接尝试出队} finally {lock.unlock();}}// poll 带超时的方法public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while (size == 0) { // 队列空if (nanos <= 0) // 超时return null;nanos = notEmpty.awaitNanos(nanos); // 等待指定时间}result = dequeue(); // 出队} finally {lock.unlock();}return result;}// 出队核心逻辑 (获取并移除堆顶元素,然后调整堆)private E dequeue() {// assert lock.isHeldByCurrentThread();int n = size - 1;if (n < 0) // 如果队列实际为空return null;else {Object[] array = queue;E result = (E) array[0]; // 获取堆顶元素 (优先级最高)E x = (E) array[n]; // 获取最后一个元素array[n] = null; // 帮助 GCComparator<? super E> cmp = comparator;// 将最后一个元素放到堆顶,然后进行下滤调整if (cmp == null)siftDownComparable(0, x, array, n); // 使用 Comparable 下滤elsesiftDownUsingComparator(0, x, array, n, cmp); // 使用 Comparator 下滤size = n; // 更新大小return result;}}// --- 堆操作核心方法 (Heap Operations) ---// 上滤 (将新元素插入末尾后,向上调整以保持堆性质)private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1; // 父节点索引Object e = array[parent];if (key.compareTo((T) e) >= 0) // 如果新元素不小于父节点,则位置正确break;// 否则,将父节点下移,继续向上比较array[k] = e;k = parent;}array[k] = key; // 找到正确位置,放入元素}// siftUpUsingComparator 类似,只是使用 Comparator 比较// 下滤 (将最后一个元素放到堆顶后,向下调整以保持堆性质)private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // 只需比较到非叶子节点while (k < half) {int child = (k << 1) + 1; // 左子节点索引Object c = array[child];int right = child + 1; // 右子节点索引// 如果右子节点存在且比左子节点小,则选择右子节点进行比较if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];// 如果当前元素不大于较小的子节点,则位置正确if (key.compareTo((T) c) <= 0)break;// 否则,将较小的子节点上移,继续向下比较array[k] = c;k = child;}array[k] = key; // 找到正确位置,放入元素}// siftDownUsingComparator 类似// 数组扩容逻辑 (省略具体实现,通常是翻倍或增加 50%)private void tryGrow(Object[] array, int oldCap) {// ...}
}
源码解读要点:
- 无界特性体现:
put
/offer
方法内部没有检查容量的while
循环等待,只有在必要时进行tryGrow
扩容。它们总是尝试添加元素,然后signal
可能等待的take
线程。 - 单锁
lock
:所有修改(offer
,poll
,take
)和查看(peek
,size
等)操作都需要获取lock
。 notEmpty
条件:只用于take
和带超时的poll
在队列为空时等待。- 堆操作核心:
offer
操作的核心是siftUp
(上滤),poll
/take
操作的核心是siftDown
(下滤)。这些操作保证了队列始终维持最小堆的性质(优先级最高的元素在堆顶,即数组索引 0 处)。 - 比较机制:通过
Comparator
或元素的Comparable
接口来决定优先级。 - 动态扩容
tryGrow
:当数组满时,会自动扩展内部数组的大小。
难点理解:
为什么 PriorityBlockingQueue 使用单锁?
优先级队列的核心操作是维持堆的有序性。siftUp
和siftDown
操作可能会涉及到从根节点到叶子节点路径上的多个元素的比较和移动。如果使用分离锁(比如一个锁管入队,一个锁管出队),很难在并发环境下高效且正确地维护整个堆的结构一致性。例如,一个入队操作正在进行siftUp
,同时一个出队操作正在进行siftDown
,它们可能操作堆的不同部分,但最终都需要保证全局的堆性质,协调起来非常复杂。单锁虽然牺牲了一些并发性(入队和出队互斥),但极大地简化了实现的正确性保证。考虑到优先级队列通常用在任务调度等场景,写入和读取可能不是完全均衡的,单锁往往是一个合理的权衡。
3.4 DelayQueue:延迟执行的无界队列
DelayQueue
是一个特殊的无界阻塞队列,它内部的元素只有在到达指定的延迟时间后才能被消费者从队列中获取。
核心特点:
- 无界 (Unbounded):与
PriorityBlockingQueue
类似,put()
和offer()
永远不会阻塞。 - 延迟获取 (Delayed Fetch):队列中的元素必须实现
java.util.concurrent.Delayed
接口。该接口只有一个方法long getDelay(TimeUnit unit)
,返回剩余的延迟时间。只有当getDelay()
返回值小于等于 0 时,该元素才能被take()
或poll()
获取。 - 优先级排序:
DelayQueue
内部实际上使用一个PriorityQueue
(非线程安全) 来存储元素。排序的依据就是元素的剩余延迟时间 (getDelay()
的返回值),剩余时间最短的元素排在队首。 - 阻塞机制:
put
/offer
: 不阻塞。take
: 如果队列为空,或者队列中所有元素的延迟时间都还没到,take
操作会阻塞。它会阻塞到队首元素的延迟时间到达为止。poll
: 非阻塞。如果队首元素延迟未到或队列为空,返回null
。带超时的poll
类似take
,但有等待时间上限。
- 单锁机制:使用一个
ReentrantLock
(lock
) 和一个关联的Condition
(available
) 来实现线程安全和阻塞唤醒。 - Leader-Follower 模式 (优化):为了避免多个消费者线程在
available.await()
上无效地醒来(可能只有一个元素的延迟到了),DelayQueue
内部实现了一种类似 Leader-Follower 的模式。当一个线程(Leader)发现队首元素的延迟时间未到,它会阻塞等待指定时间 (available.awaitNanos(delay)
)。其他后续的线程(Follower)如果发现已经有一个 Leader 线程在等待了,它们会直接无限期等待 (available.await()
),直到被 Leader 唤醒。当 Leader 线程等待超时(意味着队首元素可能到期了)或者被其他线程(如put
操作)唤醒时,它会检查队首元素是否真的到期。如果到期,它会取走元素并唤醒一个 Follower 线程(如果存在)来接替 Leader 的角色;如果未到期,它会重新计算延迟并继续等待。这种机制减少了不必要的线程唤醒和 CPU 消耗。
Delayed
接口:
package java.util.concurrent;public interface Delayed extends Comparable<Delayed> {// 返回此对象相关的剩余延迟时间,以给定的时间单位表示。long getDelay(TimeUnit unit); // Delayed 接口继承了 Comparable 接口,// 通常比较的就是 getDelay() 的结果,即剩余时间短的优先级高。
}
你需要自己实现 Delayed
接口,通常包含一个表示到期时间的成员变量。
适用场景:
- 定时任务调度:缓存过期(只有过期的缓存项才能被移除)、定时发送通知、需要延迟执行的任务等。
- 实现简易的定时器。
源码浅析 (基于 JDK 8 / OpenJDK 简化示意):
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();// 内部使用 PriorityQueue 存储并按延迟时间排序private final PriorityQueue<E> q = new PriorityQueue<E>();// --- Leader-Follower 相关 ---// 当前是否有线程正在作为 Leader 等待队首元素到期private Thread leader = null; // 用于所有线程 (Leader 和 Followers) 等待的条件变量private final Condition available = lock.newCondition();// put/offer (非阻塞)public void put(E e) {offer(e);}public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e); // 加入 PriorityQueue// 如果插入的元素成为了新的队首 (说明它的延迟时间最短)if (q.peek() == e) {leader = null; // 重置 Leaderavailable.signal(); // 唤醒可能在等待的线程 (Leader 或 Follower)}return true;} finally {lock.unlock();}}// 带超时的 offer 也是非阻塞的// take 方法 (阻塞式获取到期元素)public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) { // 无限循环,直到获取到元素E first = q.peek(); // 查看队首元素if (first == null) { // 队列为空available.await(); // 无限期等待,直到被 put 唤醒} else {long delay = first.getDelay(TimeUnit.NANOSECONDS); // 获取剩余延迟if (delay <= 0) // 如果已到期return q.poll(); // 从 PriorityQueue 中移除并返回 (这是 take 的核心)// --- 延迟未到,进入 Leader-Follower 等待逻辑 ---first = null; // help GC,因为要长时间等待if (leader != null) // 如果已有 Leader 在等待available.await(); // 作为 Follower 无限期等待else { // 没有 Leader,当前线程成为 LeaderThread thisThread = Thread.currentThread();leader = thisThread; // 标记自己为 Leadertry {// Leader 等待指定时间 (队首元素的剩余延迟)available.awaitNanos(delay); } finally {// 不论是超时唤醒还是被 put 唤醒,退出等待后都要取消 Leader 身份if (leader == thisThread)leader = null; }}// 被唤醒后,循环回到 for (;;) 再次检查队首元素状态}}} finally {// 如果退出 take 时 (比如获取到元素或抛出异常),// 并且当前线程是 Leader 或者队列不为空,// 那么需要唤醒一个 Follower (如果存在) 来接替检查/等待if (leader == null && q.peek() != null)available.signal(); lock.unlock();}}// poll() (非阻塞获取)public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();// 队列为空或队首元素未到期,返回 nullif (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)return null;elsereturn q.poll(); // 到期,移除并返回} finally {lock.unlock();}}// poll 带超时的方法 (简化逻辑示意)public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) { // 队列空if (nanos <= 0) return null; // 超时nanos = available.awaitNanos(nanos); // 等待,会减少 nanos} else {long delay = first.getDelay(TimeUnit.NANOSECONDS);if (delay <= 0) // 到期return q.poll();if (nanos <= 0) return null; // 超时// --- Leader-Follower 等待逻辑 (与 take 类似,但使用 awaitNanos) ---first = null; if (nanos < delay || leader != null) { // 等待时间不足 或 已有 Leadernanos = available.awaitNanos(nanos); // 作为 Follower 等待剩余时间} else { // 成为 Leader 等待 delay 时间Thread thisThread = Thread.currentThread();leader = thisThread;try {long timeLeft = available.awaitNanos(delay);nanos -= delay - timeLeft; // 更新剩余总等待时间} finally {if (leader == thisThread) leader = null;}}// 循环继续检查}}} finally {if (leader == null && q.peek() != null) available.signal();lock.unlock();}}// ... 其他方法如 size(), peek() 等都需要获取锁 ...
}
源码解读要点:
- 内部
PriorityQueue
:核心存储和排序由非线程安全的PriorityQueue
完成。DelayQueue
的主要工作是为其提供线程安全包装和延迟阻塞逻辑。 lock
和available
:所有操作都需要lock
。available
条件变量用于在队列为空或队首元素未到期时阻塞消费者线程。offer
逻辑:很简单,加锁,添加到PriorityQueue
,如果新元素成为队首,则唤醒可能等待的线程(因为最短延迟时间可能变了),然后解锁。take
核心循环:- 查看队首
peek()
。 - 如果为空,
await()
无限等待。 - 如果不为空,获取延迟
getDelay()
。 - 如果延迟
<= 0
(已到期),poll()
出队并返回。 - 如果延迟
> 0
(未到期),进入 Leader-Follower 等待:- 已有 Leader?
await()
无限等待(当 Follower)。 - 没有 Leader?成为 Leader,
awaitNanos(delay)
等待指定时间。
- 已有 Leader?
- 等待结束后(超时或被
offer
唤醒),leader
状态被重置(如果当前线程是 Leader),然后循环回到第 1 步重新检查。
- 查看队首
- Leader-Follower 优化:
leader
变量和相关的判断逻辑是为了确保在任何时候最多只有一个线程(Leader)在精确地等待队首元素的到期时间。其他线程(Followers)则无限期等待,避免了大量线程被同时唤醒然后又因为元素未到期而立即重新等待的情况,减少了锁竞争和 CPU 消耗。 - 退出时的
signal
:在take
或poll(timeout)
成功返回或抛出异常退出时,会检查是否需要signal()
。如果当前没有 Leader(leader == null
)并且队列非空 (q.peek() != null
),说明可能需要唤醒一个 Follower 来接替检查队首元素状态。
难点理解:
Leader-Follower 模式如何工作?
想象一个场景:队列里有一个 10 秒后到期的任务 A。
- 线程 T1 调用
take()
,发现 A 未到期,没有 Leader,T1 成为 Leader,调用available.awaitNanos(10秒)
。- 线程 T2 调用
take()
,发现 A 未到期,但leader
是 T1,于是 T2 调用available.await()
进入无限等待(成为 Follower)。- 线程 T3 调用
take()
,同 T2,也成为 Follower。- 10 秒后,T1 从
awaitNanos
返回(或者期间有新元素 B 加入导致 T1 被signal
提前唤醒)。T1 重置leader = null
。- T1 再次检查队首(可能是 A 或 B),如果到期,取走并返回。在
finally
块中,发现leader == null
且队列可能非空,调用available.signal()
。- 被唤醒的可能是 T2 或 T3。假设 T2 被唤醒,它会回到循环开始,检查队首。如果队首元素未到期,它可能会成为新的 Leader。
这种方式避免了 T1, T2, T3 在 A 到期时都被唤醒,然后竞争锁去检查 A 是否到期的混乱局面。
3.5 SynchronousQueue:不存储元素的“直接传递”队列
SynchronousQueue
是一个非常特殊的 BlockingQueue
实现,它内部没有容量,或者说容量为 0。它不存储任何元素。
核心特点:
- 零容量 (Zero Capacity):它不像其他队列那样有缓冲区。每个
put
操作必须等待一个对应的take
操作,反之亦然。put
线程和take
线程必须直接配对才能完成元素的传递。 - 直接传递 (Direct Handoff):当一个线程调用
put(e)
时,它会阻塞,直到另一个线程调用take()
。当take
线程准备好接收时,put
线程才会把元素e
直接传递给take
线程,然后两个线程都可以继续执行。反向操作 (take
先等待put
) 也是如此。 offer
/poll
行为:offer(e)
:只有当恰好有另一个线程正在等待take()
时,才会成功(返回true
),否则立即返回false
。它不会阻塞等待配对。poll()
:只有当恰好有另一个线程正在等待put()
时,才会成功(返回元素),否则立即返回null
。- 带超时的
offer/poll
:会在指定时间内等待配对的线程出现。
- 不允许
null
元素。 - 公平性 (Fairness):可以在构造时指定公平策略 (默认非公平)。
- 非公平模式 (默认):使用栈 (LIFO) 来管理等待的线程。后来的请求可能先被满足。性能通常更高。
- 公平模式:使用队列 (FIFO) 来管理等待的线程。等待时间最长的线程会优先配对。保证公平性。
peek()
,iterator()
,size()
等方法:peek()
永远返回null
。iterator()
是空的。size()
永远返回 0。因为它根本不存储元素。
适用场景:
- 高并发、低延迟的生产者-消费者场景,其中生产一个元素后希望立即被消费,或者消费一个元素前希望立即有生产者提供。它适合传递性工作,一个任务的处理结果直接作为下一个任务的输入。
- 线程池中的使用:
Executors.newCachedThreadPool()
使用SynchronousQueue
。当提交一个新任务时:- 如果有空闲的工作线程正在
take()
等待任务,任务直接交给它执行。 - 如果没有空闲线程,
SynchronousQueue
的offer
会失败(因为没有等待的take
),线程池会创建新的工作线程来处理任务。 - 这使得
CachedThreadPool
能够根据负载动态调整线程数量,并且任务传递非常高效。
- 如果有空闲的工作线程正在
- 实现线程间的一对一信号/数据交换。
源码浅析 (概念性,实际源码复杂,涉及内部 Transferer 接口和 Stack/Queue 实现):
SynchronousQueue
的内部实现相当复杂,它定义了一个 Transferer
内部接口,并有两个主要实现:TransferStack
(非公平) 和 TransferQueue
(公平)。这些实现使用 CAS (Compare-and-Swap) 原子操作和复杂的等待节点状态管理来实现高效的线程配对和数据传递,尽量避免使用重量级的锁。
概念模型 (非公平 TransferStack 为例):
- 等待节点 (SNode):当一个线程 (如
put
线程 P) 无法立即找到配对线程 (如take
线程 T) 时,它会创建一个代表自己操作(REQUEST
表示take
,DATA
表示put
)和数据(如果是put
)的等待节点SNode
。 - 栈顶 (head):
TransferStack
维护一个指向等待节点栈顶的原子引用head
。 put
操作:put
线程 P 尝试将一个包含数据e
的DATA
类型节点推入栈顶。- 在推入前,它会检查栈顶节点
h
:- 如果
h
是REQUEST
类型(表示有take
线程在等待),P 不推入自己的节点,而是尝试匹配h
。它会尝试通过 CAS 将h
的match
字段指向自己的DATA
节点,并唤醒等待在h
上的take
线程 T。如果匹配成功,P 将数据e
交给 T,然后 P 和 T 都完成操作。 - 如果
h
不是REQUEST
(栈为空或栈顶是DATA
),P 尝试通过 CAS 将自己的DATA
节点设为新的head
。 - 如果 CAS 成功,P 的节点成为新的栈顶,P 就阻塞等待,直到被一个
take
线程匹配并唤醒。 - 如果 CAS 失败(说明栈顶被其他线程修改了),P 重试整个过程。
- 如果
take
操作:take
线程 T 尝试将一个REQUEST
类型节点推入栈顶。- 在推入前,检查栈顶节点
h
:- 如果
h
是DATA
类型(表示有put
线程 P 在等待并持有数据),T 不推入自己的节点,而是尝试匹配h
。它通过 CAS 将h
的match
字段指向自己的REQUEST
节点,并唤醒等待在h
上的put
线程 P。如果匹配成功,T 从h
节点获取数据e
,然后 T 和 P 都完成操作。 - 如果
h
不是DATA
(栈为空或栈顶是REQUEST
),T 尝试通过 CAS 将自己的REQUEST
节点设为新的head
。 - 如果 CAS 成功,T 的节点成为新的栈顶,T 就阻塞等待,直到被一个
put
线程匹配并唤醒,并接收到数据。 - 如果 CAS 失败,T 重试。
- 如果
公平模式 (TransferQueue) 使用类似的思想,但等待节点组织成 FIFO 队列,确保先来的线程先被匹配。
// 这是一个高度简化的概念性代码,并非实际源码
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 内部传输机制 (TransferStack 或 TransferQueue)private transient volatile Transferer<E> transferer;// 构造函数 (可以选择公平性)public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}// 默认非公平public SynchronousQueue() { this(false); }// put 方法 (委托给 transferer)public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// transfer 方法的 E_NOW 模式表示立即传输,如果不能立即匹配就阻塞等待if (transferer.transfer(e, false, 0) == null) { // false: 不是超时模式, 0: 超时时间 N/AThread.interrupted(); // 清除中断状态throw new InterruptedException();}}// take 方法 (委托给 transferer)public E take() throws InterruptedException {// transfer 方法的 R_NOW 模式表示立即接收,如果不能立即匹配就阻塞等待E e = transferer.transfer(null, false, 0); // null: 表示是 take 操作if (e != null)return e;Thread.interrupted();throw new InterruptedException();}// offer 方法 (非阻塞)public boolean offer(E e) {if (e == null) throw new NullPointerException();// transfer 方法的 E_TRY 模式表示尝试立即传输,不阻塞return transferer.transfer(e, true, 0) != null; // true: 是超时(尝试)模式, 0: 超时时间 N/A}// poll 方法 (非阻塞)public E poll() {// transfer 方法的 R_TRY 模式表示尝试立即接收,不阻塞return transferer.transfer(null, true, 0);}// 带超时的 offer/poll (委托给 transferer, 使用 E_TIMED/R_TIMED 模式)public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {if (e == null) throw new NullPointerException();if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) // true: 超时模式return true;if (!Thread.interrupted()) // 如果不是因为中断而失败return false;throw new InterruptedException();}public E poll(long timeout, TimeUnit unit) throws InterruptedException {E e = transferer.transfer(null, true, unit.toNanos(timeout));if (e != null || !Thread.interrupted())return e;throw new InterruptedException();}// --- 内部 Transferer 接口和实现 (高度简化概念) ---abstract static class Transferer<E> {// item: put 的数据 或 take 的 null// timed: 是否是 try/timed 模式 (true) 还是 now 模式 (false)// nanos: 超时时间 (timed 为 true 时有效)// 返回: take 时返回数据,put 时返回非 null (表示成功),超时或中断返回 nullabstract E transfer(E item, boolean timed, long nanos);}// TransferStack/TransferQueue 的实现细节非常复杂,涉及 CAS、自旋、// 等待节点状态 (WAITING, MATCHED, CANCELLED) 等,这里不再展开。// ... size(), isEmpty(), peek() 等方法返回固定值或空 ...
}
难点理解:
SynchronousQueue 性能为何高?
- 无存储开销:不需要管理缓冲区数组或链表,没有入队出队的索引/指针操作,没有元素复制。
- 直接传递:数据直接从生产者线程的栈传递到消费者线程的栈(逻辑上),减少了中间环节。
- 高度优化的同步机制:内部大量使用 CAS 原子操作和自旋等待,尽量避免进入重量级的锁阻塞状态,特别是在非公平模式下。只有在无法立即匹配且自旋几次后,线程才会真正挂起。
这使得在生产者和消费者速率匹配良好的情况下,
SynchronousQueue
的吞吐量非常高,延迟极低。但如果速率严重不匹配,会导致大量线程阻塞等待。
3.6 LinkedTransferQueue:更强大的无界链表队列
LinkedTransferQueue
是 Java 7 引入的一个无界的、基于链表的 BlockingQueue
实现。它融合了 LinkedBlockingQueue
的链表结构和 SynchronousQueue
的直接传递特性,并提供了更强大的 transfer
方法。
核心特点:
- 无界 (Unbounded):容量为
Integer.MAX_VALUE
。put
/offer
永不阻塞(除非 OOM)。 - 底层结构:基于链表,类似于
LinkedBlockingQueue
,但节点结构更复杂,包含更多状态信息以支持transfer
操作。 - 双重数据结构 (Dual Data Structure):内部巧妙地使用 CAS 操作维护链表结构,使其既可以像普通队列一样 FIFO 存储元素,也可以支持类似
SynchronousQueue
的匹配和直接传递。 - 核心方法
transfer(E e)
:这是LinkedTransferQueue
的特色方法。- 如果当前有消费者线程正在等待 (
take()
或poll()
超时等待中),transfer
会立即将元素e
直接传递给一个等待的消费者,然后返回true
,不入队。 - 如果没有等待的消费者,
transfer
会将元素e
加入队列尾部,然后阻塞,直到这个元素e
被某个消费者线程take
或poll
走为止。 - 它就像是
SynchronousQueue
的put
和LinkedBlockingQueue
的put
的结合体:优先尝试直接传递,如果不行再入队并等待被消费。
- 如果当前有消费者线程正在等待 (
- 其他方法行为:
put(e)
/offer(e)
:总是将元素添加到队尾,不阻塞,行为类似无界的LinkedBlockingQueue
。它们不会尝试直接传递。tryTransfer(E e)
:尝试立即将元素e
传递给一个等待的消费者。如果成功则返回true
(不入队);如果没有等待的消费者,则不入队,立即返回false
。这是transfer
的非阻塞版本。tryTransfer(E e, long timeout, TimeUnit unit)
:带超时的transfer
。尝试传递,如果没有消费者,则将元素入队,并等待指定时间看是否被消费。take()
/poll()
:行为与LinkedBlockingQueue
类似,从队首获取元素。如果队列为空,take
阻塞,poll
返回null
。它们也会与正在transfer
或tryTransfer
的生产者进行匹配。
- 高性能:内部实现大量使用 CAS 原子操作,在很多场景下比
LinkedBlockingQueue
和SynchronousQueue
性能更好,尤其是在混合了直接传递和缓冲需求的场景中。
适用场景:
- 需要高性能、高吞吐量的消息传递或任务分发系统。
- 希望结合
SynchronousQueue
的低延迟直接传递和LinkedBlockingQueue
的缓冲能力。 - 生产者希望知道自己的数据/任务是否被消费者立即接收处理(使用
transfer
)。 - 实现更复杂的生产者-消费者协作模式。
源码复杂度:LinkedTransferQueue
的实现是 JUC 包中最复杂的之一,涉及到精妙的双重链表(dual queue)思想和复杂的 CAS 状态转换,其源码阅读难度较大。
3.7 LinkedBlockingDeque:双端阻塞队列
LinkedBlockingDeque
是一个基于链表实现的双端阻塞队列。Deque 的意思是 Double Ended Queue。
核心特点:
- 双端操作:可以在队列的两端(头部和尾部)进行元素的插入和移除操作。
- 阻塞性:所有插入和移除操作都提供了阻塞版本 (
putFirst
,putLast
,takeFirst
,takeLast
) 和非阻塞/超时版本 (offerFirst
,offerLast
,pollFirst
,pollLast
)。 - 可选有界:构造时可以指定容量,默认为
Integer.MAX_VALUE
(无界)。 - 底层结构:双向链表。
- 单锁机制:使用一个
ReentrantLock
和两个Condition
(notEmpty
,notFull
) 来控制并发,类似于ArrayBlockingQueue
。因此,所有操作(无论在头还是尾)都是互斥的。 - 不允许
null
元素。
适用场景:
- 工作窃取 (Work Stealing) 算法:这是
LinkedBlockingDeque
最典型的应用场景。在 Fork/Join 框架或一些并行计算模型中,每个工作线程都有自己的双端队列。- 线程优先处理自己队列头部的任务 (
takeFirst
)。 - 当自己队列为空时,线程会尝试从其他线程队列的尾部 (
takeLast
) “窃取”一个任务来执行。 - 这种策略可以很好地平衡负载,减少线程空闲时间。
Deque
的双端特性使得自身任务处理(LIFO,后进先出,可能利用缓存局部性)和任务窃取(FIFO,先产生任务先被偷走,减少任务饥饿)可以高效进行。
- 线程优先处理自己队列头部的任务 (
- 需要同时从队列两端进行操作的其他场景。
与 LinkedBlockingQueue 的对比:
Deque
支持两端操作,Queue
只支持尾部入队、头部出队。LinkedBlockingDeque
使用单锁,LinkedBlockingQueue
使用双锁。因此在高并发下,如果操作主要集中在一端,LinkedBlockingQueue
的吞吐量可能更高;但如果需要在两端灵活操作,LinkedBlockingDeque
提供了必要的功能。
4. BlockingQueue 实现原理深入:锁与条件变量
理解 BlockingQueue
的工作原理,核心在于理解其背后的同步机制:锁 (Lock) 和 条件变量 (Condition)。虽然 SynchronousQueue
和 LinkedTransferQueue
大量使用 CAS 优化,但 ArrayBlockingQueue
, LinkedBlockingQueue
, PriorityBlockingQueue
, DelayQueue
, LinkedBlockingDeque
这些更经典的实现,其阻塞和唤醒机制的基础都是 java.util.concurrent.locks
包下的 ReentrantLock
和 Condition
。
回顾生产者-消费者问题:
我们需要一个共享的缓冲区(队列),生产者向里放东西,消费者从里面取东西。
- 问题1:线程安全。多个生产者/消费者同时访问队列,如何保证数据不错乱?
- 解决方案:使用锁。每次访问队列前获取锁,访问结束后释放锁。同一时间只有一个线程能持有锁,保证了操作的原子性和内存可见性。
ReentrantLock
是 JUC 提供的比synchronized
更灵活、功能更强的锁实现。
- 解决方案:使用锁。每次访问队列前获取锁,访问结束后释放锁。同一时间只有一个线程能持有锁,保证了操作的原子性和内存可见性。
- 问题2:队列满/空时的等待与通知。生产者发现队列满了怎么办?消费者发现队列空了怎么办?总不能一直空转浪费 CPU 吧?
- 解决方案:使用条件变量 (Condition)。条件变量总是与一个锁关联。它提供了
await()
,signal()
,signalAll()
方法。await()
: 当线程发现条件不满足时(如生产者发现队列满),它可以在关联的条件变量上调用await()
。这个操作会原子地释放当前线程持有的锁,并将线程置于等待状态,直到被其他线程signal
或signalAll
唤醒,或者被中断。当线程被唤醒后,它会重新尝试获取之前释放的锁,获取成功后才能从await()
返回继续执行。signal()
: 唤醒一个在该条件变量上等待的线程。具体唤醒哪个线程是不确定的(取决于实现,通常是等待队列中的第一个)。被唤醒的线程需要重新竞争锁。signalAll()
: 唤醒所有在该条件变量上等待的线程。所有被唤醒的线程都需要重新竞争锁。
- 解决方案:使用条件变量 (Condition)。条件变量总是与一个锁关联。它提供了
以 ArrayBlockingQueue
为例看锁与条件变量的应用:
// 再看关键部分
final ReentrantLock lock; // 唯一的锁
private final Condition notEmpty; // 队列不空条件 (消费者等待)
private final Condition notFull; // 队列不满条件 (生产者等待)// put (生产者)
lock.lockInterruptibly();
try {while (count == items.length) { // 条件:队列满notFull.await(); // 释放 lock,在 notFull 上等待}enqueue(e); // 条件满足,执行操作notEmpty.signal(); // 唤醒一个可能在 notEmpty 上等待的消费者
} finally {lock.unlock();
}// take (消费者)
lock.lockInterruptibly();
try {while (count == 0) { // 条件:队列空notEmpty.await(); // 释放 lock,在 notEmpty 上等待}E x = dequeue(); // 条件满足,执行操作notFull.signal(); // 唤醒一个可能在 notFull 上等待的生产者
} finally {lock.unlock();
}
流程梳理 (put 为例):
- 生产者线程 P 调用
put()
,首先尝试获取lock
。如果锁被其他线程(可能是另一个生产者 P’ 或消费者 C)持有,P 阻塞等待锁。 - P 成功获取
lock
。 - P 检查条件
while (count == items.length)
。 - 如果条件满足 (队列已满):
- P 调用
notFull.await()
。 await()
方法内部:- 将 P 线程加入到
notFull
条件的等待队列中。 - 完全释放 P 持有的
lock
(这是关键!允许其他线程获取锁,比如消费者 C)。 - 阻塞 P 线程。
- 将 P 线程加入到
- P 调用
- 如果条件不满足 (队列未满):
- P 执行
enqueue(e)
,修改队列状态 (添加元素,增加count
)。 - P 调用
notEmpty.signal()
。这个信号是发给另一个条件变量notEmpty
的,目的是唤醒可能因为队列空而等待的消费者线程 C。 - P 在
finally
块中释放lock
。
- P 执行
- 唤醒过程:假设之前有消费者线程 C 因为队列空而在
notEmpty.await()
处等待。当某个生产者 P’ (或者就是刚才的 P) 执行完enqueue
并调用notEmpty.signal()
时:- 等待在
notEmpty
条件队列中的 C 线程被唤醒。 - 被唤醒的 C 不会立即执行,它需要重新尝试获取
lock
。 - 如果 C 成功获取
lock
,它将从之前的notEmpty.await()
调用处返回。 - 因为
await()
在while
循环中,C 会再次检查条件while (count == 0)
。这次检查通常会失败(因为队列已被 P’ 放入元素),循环结束。 - C 执行
dequeue()
,然后调用notFull.signal()
唤醒可能等待的生产者,最后释放lock
。
- 等待在
为什么需要 while
循环检查条件 (防止虚假唤醒)?
规范允许
await()
方法在没有被signal
/signalAll
或中断的情况下意外返回,这种情况称为虚假唤醒 (Spurious Wakeup)。虽然罕见,但必须处理。如果使用if (condition) await();
,线程被虚假唤醒后,会跳过if
直接执行后续代码,但此时条件可能并未真正满足,导致程序错误。使用while (condition) await();
可以确保线程每次被唤醒后都重新检查条件,只有条件真正满足时才退出循环继续执行,从而正确处理虚假唤醒。
signal()
vs signalAll()
的选择:
ArrayBlockingQueue
,LinkedBlockingQueue
等通常使用signal()
。因为每次入队/出队操作只会让一个等待的线程(一个生产者或一个消费者)的条件可能被满足,唤醒一个就够了,效率更高。- 在某些复杂的同步场景中,一个事件可能使得多个等待线程的条件都满足,或者难以判断应该唤醒哪个线程时,使用
signalAll()
更安全(虽然可能导致更多线程竞争锁,有所谓的“惊群效应” Thundering Herd)。
内存可见性保证 (Happens-Before):
ReentrantLock
的 lock()
和 unlock()
操作以及 Condition
的 await()
, signal()
, signalAll()
操作都具有 happens-before
关系保证:
- 对一个锁的
unlock
操作 happens-before 后续对同一个锁的lock
操作。 - 将元素加入条件等待队列的操作 happens-before 从
await()
返回的操作。 signal
/signalAll
操作 happens-before 被唤醒线程从await()
返回的操作。
这意味着,一个线程在释放锁之前对共享变量(如 count
, items
数组)的修改,对于后续成功获取该锁的线程来说是可见的。同样,生产者放入队列的元素,对于之后被唤醒并成功 take
的消费者来说是可见的。这是 BlockingQueue
线程安全的基础保障之一。
5. 如何选择合适的 BlockingQueue?
面对如此多的 BlockingQueue
实现,如何选择最适合自己需求的那一个?可以从以下几个维度考虑:
- 有界 vs 无界 (Bounded vs Unbounded)
- 需要严格控制资源消耗,防止队列无限增长导致内存溢出? -> 选择有界队列 (
ArrayBlockingQueue
,LinkedBlockingQueue(指定容量)
,LinkedBlockingDeque(指定容量)
)。这是保证系统稳定性的重要手段。 - 不确定队列大小,或者希望队列尽可能大,生产者不应被阻塞? -> 考虑无界队列 (
LinkedBlockingQueue
,PriorityBlockingQueue
,DelayQueue
,LinkedTransferQueue
,LinkedBlockingDeque
)。但务必注意:无界队列可能隐藏生产者过快的问题,最终可能耗尽内存。必须确保消费者有能力跟上,或者有其他机制来限制生产者速率。
- 需要严格控制资源消耗,防止队列无限增长导致内存溢出? -> 选择有界队列 (
- 性能与吞吐量
- 追求最高并发吞吐量,尤其是在多核、高竞争环境下? -> 优先考虑锁分离的
LinkedBlockingQueue
或更优化的LinkedTransferQueue
。 - 并发度不高,或生产者消费者速率相对平衡,更看重内存稳定性和实现的简洁性? ->
ArrayBlockingQueue
是个不错的选择。 - 需要零缓冲、最低延迟的直接传递? ->
SynchronousQueue
是不二之选(但需注意其适用场景)。
- 追求最高并发吞吐量,尤其是在多核、高竞争环境下? -> 优先考虑锁分离的
- 公平性 (Fairness)
- 是否需要保证线程按等待顺序获取锁/元素,防止饥饿? -> 选择支持公平模式的
ArrayBlockingQueue(fair=true)
或SynchronousQueue(fair=true)
。注意公平性通常会牺牲一些性能。 - 对公平性无要求,追求更高性能? -> 使用默认的非公平模式。
- 是否需要保证线程按等待顺序获取锁/元素,防止饥饿? -> 选择支持公平模式的
- 元素排序或延迟
- 需要根据元素的优先级处理? ->
PriorityBlockingQueue
。 - 需要元素在延迟到期后才能被处理? ->
DelayQueue
。
- 需要根据元素的优先级处理? ->
- 队列操作方式
- 只需要标准的 FIFO 队列操作? ->
ArrayBlockingQueue
,LinkedBlockingQueue
,PriorityBlockingQueue
等。 - 需要从队列两端进行操作(如工作窃取)? ->
LinkedBlockingDeque
。 - 需要
transfer
这种更强的传递语义(尝试直接传递,不行再入队并等待消费)? ->LinkedTransferQueue
。
- 只需要标准的 FIFO 队列操作? ->
选择决策树 (简化版):
graph TDA{需求场景?} --> B{需要优先级?};B -- 是 --> C[PriorityBlockingQueue];B -- 否 --> D{需要延迟执行?};D -- 是 --> E[DelayQueue];D -- 否 --> F{需要零缓冲直接传递?};F -- 是 --> G[SynchronousQueue];F -- 否 --> H{需要双端操作?};H -- 是 --> I[LinkedBlockingDeque];H -- 否 --> J{是否必须有界?};J -- 是 --> K{选 Array 还是 Linked?};K -- Array --> L[ArrayBlockingQueue (内存稳定,可选公平)];K -- Linked --> M[LinkedBlockingQueue(指定容量) (更高吞吐潜力)];J -- 否/可选 --> N{追求最高性能/Transfer语义?};N -- 是 --> O[LinkedTransferQueue];N -- 否 --> P[LinkedBlockingQueue (默认无界,高吞吐)];style C fill:#f9f,stroke:#333,stroke-width:2pxstyle E fill:#f9f,stroke:#333,stroke-width:2pxstyle G fill:#f9f,stroke:#333,stroke-width:2pxstyle I fill:#f9f,stroke:#333,stroke-width:2pxstyle L fill:#ccf,stroke:#333,stroke-width:2pxstyle M fill:#ccf,stroke:#333,stroke-width:2pxstyle O fill:#cfc,stroke:#333,stroke-width:2pxstyle P fill:#cfc,stroke:#333,stroke-width:2px
总结建议:
- 一般场景:
LinkedBlockingQueue
通常是首选,因为它提供了较好的吞吐量和灵活性(可选有界)。 - 内存敏感或需要公平性:考虑
ArrayBlockingQueue
。 - 需要特定功能:根据优先级、延迟、双端、直接传递等需求选择
PriorityBlockingQueue
,DelayQueue
,LinkedBlockingDeque
,SynchronousQueue
,LinkedTransferQueue
。 - 对无界队列保持警惕:使用无界队列时,一定要监控内存使用情况,并考虑是否有潜在的 OOM 风险。
6. BlockingQueue 实战应用示例
理论结合实践是最好的学习方式。下面我们通过几个例子来展示 BlockingQueue
的应用。
6.1 示例一:实现请求批处理组件
场景:假设有一个服务,需要接收大量并发请求,但后端存储(如数据库)不希望被瞬时高并发冲垮,而是希望将请求缓存起来,达到一定数量或等待一段时间后,批量写入数据库。
实现思路:
- 使用
BlockingQueue
(如LinkedBlockingQueue
) 作为请求的缓存队列。 - 启动一个单独的后台处理线程。
- 处理线程循环执行:
- 尝试从队列中获取一个请求,设置一个超时时间(例如 1 秒)。
poll(timeout)
- 如果成功获取到第一个请求:
- 将该请求加入当前批次
List
。 - 立即尝试从队列中拉取更多请求(非阻塞),直到达到批处理数量阈值或队列为空。可以使用
drainTo()
方法高效完成。 - 处理整个批次的请求(例如批量写入数据库)。
- 清空批次
List
。
- 将该请求加入当前批次
- 如果超时仍未获取到请求(说明队列在超时时间内为空),可以执行一些空闲逻辑,或者直接进入下一轮循环等待。
- 尝试从队列中获取一个请求,设置一个超时时间(例如 1 秒)。
- 提供一个
addRequest
方法供外部调用,将请求放入队列(可以使用带超时的offer
防止队列过载时无限阻塞)。 - 提供
start
和shutdown
方法来控制处理线程的生命周期。shutdown
时需要处理队列中剩余的请求。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;/*** 通用请求批处理组件* @param <T> 请求类型*/
public class BatchRequestProcessor<T> {private final BlockingQueue<T> requestQueue; // 缓存请求的阻塞队列private final int batchSize; // 批处理大小阈值private final long timeoutMs; // 处理时间窗口/超时时间 (毫秒)private final Consumer<List<T>> processor; // 实际的批处理器 (如写入数据库)private volatile boolean running = true; // 控制处理线程运行状态private Thread processorThread; // 处理线程/*** 构造函数* @param capacity 队列容量* @param batchSize 批处理大小* @param timeoutMs 超时时间(毫秒),达到此时间即使未满一批也会处理* @param processor 批处理器*/public BatchRequestProcessor(int capacity, int batchSize, long timeoutMs, Consumer<List<T>> processor) {if (capacity <= 0 || batchSize <= 0 || timeoutMs <= 0 || processor == null) {throw new IllegalArgumentException("Invalid arguments");}// 使用 LinkedBlockingQueue,可以限制容量防止 OOMthis.requestQueue = new LinkedBlockingQueue<>(capacity);this.batchSize = batchSize;this.timeoutMs = timeoutMs;this.processor = processor;this.processorThread = new Thread(this::processRequests, "BatchProcessorThread");this.processorThread.setDaemon(false); // 通常设为非守护线程,确保能处理完剩余任务}/*** 向队列中添加请求 (带超时,避免无限阻塞)* @param request 请求对象* @return 是否成功添加到队列* @throws InterruptedException 如果线程在等待时被中断*/public boolean addRequest(T request) throws InterruptedException {if (!running) {System.err.println("Processor is shutdown. Cannot add request.");return false;}// 使用 offer 带超时,给 100ms 尝试入队,失败则认为队列满/系统繁忙return requestQueue.offer(request, 100, TimeUnit.MILLISECONDS);}/*** 启动批处理线程*/public void start() {if (processorThread.isAlive()) {System.out.println("Processor already started.");return;}running = true;processorThread.start();System.out.println("Batch Processor started.");}/*** 停止批处理线程 (会处理完队列中剩余的请求)*/public void shutdown() throws InterruptedException {System.out.println("Shutting down Batch Processor...");running = false;// 中断处理线程,使其从 poll(timeout) 或 await() 中醒来processorThread.interrupt();// 等待处理线程结束processorThread.join();System.out.println("Batch Processor shutdown complete.");}// 处理线程的核心逻辑private void processRequests() {List<T> batch = new ArrayList<>(batchSize); // 预分配批次列表容量long lastProcessTime = System.currentTimeMillis(); // 记录上次处理时间while (running || !requestQueue.isEmpty()) { // 运行时 或 停止后队列不为空时 继续处理try {// 核心:使用 poll 带超时获取第一个元素// 超时时间动态计算,保证至少 timeoutMs 会触发一次检查/处理long currentTime = System.currentTimeMillis();long waitTime = timeoutMs - (currentTime - lastProcessTime);if (waitTime <= 0) waitTime = timeoutMs; // 保证至少等待 timeoutMsT firstRequest = requestQueue.poll(waitTime, TimeUnit.MILLISECONDS);if (firstRequest != null) {// 获取到第一个请求,加入批次batch.add(firstRequest);// 关键优化:使用 drainTo 非阻塞地批量获取剩余元素,直到满批或队列空// drainTo 会将元素从队列移除并添加到 batch 列表// 第二个参数是最大获取数量requestQueue.drainTo(batch, batchSize - 1); }// 检查是否需要处理 (有元素 或 达到时间窗口)// 注意:即使 poll 超时返回 null,如果时间窗口到了也可能需要处理 (虽然批次是空的)// 但我们这里简化为:只有批次不为空时才处理if (!batch.isEmpty()) {System.out.println(Thread.currentThread().getName() + " processing batch of size: " + batch.size());try {// 调用外部传入的处理器执行实际操作processor.accept(new ArrayList<>(batch)); // 传递副本,防止处理器修改原 batch} catch (Exception e) {// 处理异常,避免中断整个处理线程System.err.println("Error processing batch: " + e.getMessage());e.printStackTrace();// 这里可以添加错误处理逻辑,比如记录失败的批次} finally {// 清空批次,为下一轮准备batch.clear();lastProcessTime = System.currentTimeMillis(); // 更新处理时间}}// 如果 running 为 false 且队列已空,则退出循环if (!running && requestQueue.isEmpty()) {break;}} catch (InterruptedException e) {// 捕获中断信号 (通常是 shutdown 时发出)System.out.println(Thread.currentThread().getName() + " interrupted. Checking running state...");// 如果是正常关闭,循环会在下一轮判断 running && !queue.isEmpty() 时自然退出// 如果需要在中断后立即退出,可以在这里 break// Thread.currentThread().interrupt(); // 重新设置中断状态 (如果需要向上传递)if (!running) {System.out.println("Shutdown initiated, exiting processing loop soon.");}} catch (Exception e) {// 捕获其他运行时异常,保证线程继续运行System.err.println("Unexpected error in processing loop: " + e.getMessage());e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + " processing loop finished.");// 最后再检查一次队列,以防万一在退出循环判断后又有元素加入 (虽然理论上不应该)if (!requestQueue.isEmpty()) {System.out.println("Processing remaining items after loop exit...");List<T> remaining = new ArrayList<>();requestQueue.drainTo(remaining);if (!remaining.isEmpty()) {try {processor.accept(remaining);} catch (Exception e) {System.err.println("Error processing final remaining batch: " + e.getMessage());e.printStackTrace();}}}}// 示例用法public static void main(String[] args) throws InterruptedException {// 创建处理器:队列容量 1000, 批大小 10, 超时 1000ms (1秒)// 处理器逻辑:打印批次内容BatchRequestProcessor<String> processor = new BatchRequestProcessor<>(1000, 10, 1000, batch -> {System.out.println("--- Processing Batch ---");for (String req : batch) {System.out.println("Processing: " + req);// 模拟处理耗时try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException ignored) {}}System.out.println("--- Batch Done ---");});// 启动处理器processor.start();// 模拟并发添加请求Thread producer1 = new Thread(() -> {for (int i = 0; i < 25; i++) {try {if (!processor.addRequest("Request-P1-" + i)) {System.err.println("P1 Failed to add request " + i);}TimeUnit.MILLISECONDS.sleep(20); // 模拟请求间隔} catch (InterruptedException e) { Thread.currentThread().interrupt(); }}});Thread producer2 = new Thread(() -> {for (int i = 0; i < 33; i++) {try {if (!processor.addRequest("Request-P2-" + i)) {System.err.println("P2 Failed to add request " + i);}TimeUnit.MILLISECONDS.sleep(30);} catch (InterruptedException e) { Thread.currentThread().interrupt(); }}});producer1.start();producer2.start();// 等待生产者完成producer1.join();producer2.join();System.out.println("Producers finished.");// 等待一段时间让处理器处理剩余请求TimeUnit.SECONDS.sleep(3);// 关闭处理器processor.shutdown();}
}
代码亮点:
- 封装性:将批处理逻辑封装在
BatchRequestProcessor
类中,使用者只需提供处理逻辑Consumer
。 - 阻塞队列核心:使用
LinkedBlockingQueue
作为缓冲区,自动处理线程安全。限制容量防止内存耗尽。 - 双触发机制:
- 数量触发:
drainTo
批量获取,尽快达到batchSize
。 - 时间触发:
poll(waitTime, ...)
保证即使请求量少,也会在timeoutMs
左右触发一次处理检查。
- 数量触发:
drainTo()
优化:比循环调用poll()
更高效地批量获取元素。- 优雅关闭 (
shutdown
):通过volatile boolean running
标志和interrupt()
来通知处理线程停止,并确保处理完队列中剩余的元素。 - 异常处理:处理了
InterruptedException
和批处理逻辑可能抛出的异常,保证处理线程的健壮性。
6.2 示例二:经典的生产者-消费者模式
这是 BlockingQueue
最基础也是最重要的应用。
场景:一个或多个生产者线程生成数据(如任务、消息),放入 BlockingQueue
;一个或多个消费者线程从 BlockingQueue
取出数据进行处理。
实现思路:
- 创建一个共享的
BlockingQueue
实例(例如ArrayBlockingQueue
或LinkedBlockingQueue
)。 - 创建生产者
Runnable
:- 在循环中生成数据。
- 调用
queue.put(data)
将数据放入队列。如果队列满,put
会自动阻塞。 - 需要一种方式来通知消费者生产结束(可选,但常用)。一种方法是定义一个特殊的“毒丸 (Poison Pill)”对象,生产者完成生产后,向队列中放入与消费者数量相等的毒丸。
- 创建消费者
Runnable
:- 在循环中调用
data = queue.take()
从队列获取数据。如果队列空,take
会自动阻塞。 - 检查获取到的数据是否是毒丸。如果是,则消费者线程结束循环。
- 如果不是毒丸,则处理数据。
- 在循环中调用
- 在主线程中创建并启动生产者和消费者线程。
- 等待所有线程结束(使用
join()
)。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class ProducerConsumerWithBlockingQueue {// 使用 Integer 作为数据类型,-1 作为毒丸private static final int POISON_PILL = -1;// 生产者 Runnablestatic class Producer implements Runnable {private final BlockingQueue<Integer> queue;private final int productionCount; // 每个生产者生产的数量private final int poisonPillCount; // 需要放入的毒丸数量 (等于消费者总数)private final AtomicInteger idCounter; // 用于生成唯一产品 IDProducer(BlockingQueue<Integer> queue, int productionCount, int poisonPillCount, AtomicInteger idCounter) {this.queue = queue;this.productionCount = productionCount;this.poisonPillCount = poisonPillCount;this.idCounter = idCounter;}@Overridepublic void run() {try {for (int i = 0; i < productionCount; i++) {int data = idCounter.getAndIncrement(); // 生成数据System.out.println(Thread.currentThread().getName() + " Producing: " + data);queue.put(data); // 放入队列 (可能阻塞)TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(50, 150)); // 模拟生产耗时}System.out.println(Thread.currentThread().getName() + " Finished producing. Putting poison pills...");// 生产结束后,放入毒丸通知所有消费者for (int i = 0; i < poisonPillCount; i++) {queue.put(POISON_PILL);}System.out.println(Thread.currentThread().getName() + " Put poison pills. Exiting.");} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println(Thread.currentThread().getName() + " Producer interrupted.");}}}// 消费者 Runnablestatic class Consumer implements Runnable {private final BlockingQueue<Integer> queue;Consumer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {Integer data = queue.take(); // 从队列获取数据 (可能阻塞)// 检查是否是毒丸if (data == POISON_PILL) {System.out.println(Thread.currentThread().getName() + " Received poison pill. Exiting.");// 注意:如果希望其他消费者也能收到毒丸,需要把毒丸再放回去// queue.put(POISON_PILL); // (可选,取决于你的结束策略)// 在本例中,生产者会为每个消费者放入一个毒丸,所以不需要消费者再放回去。break; // 退出循环}// 处理数据System.out.println(Thread.currentThread().getName() + " Consuming: " + data);TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 300)); // 模拟消费耗时}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println(Thread.currentThread().getName() + " Consumer interrupted.");}}}public static void main(String[] args) throws InterruptedException {int queueCapacity = 10; // 队列容量int numProducers = 2; // 生产者数量int numConsumers = 3; // 消费者数量int itemsPerProducer = 15; // 每个生产者生产数量// 选择 BlockingQueue 实现,这里用 ArrayBlockingQueueBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueCapacity);// BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(queueCapacity); // 或者用 LinkedBlockingQueueAtomicInteger idCounter = new AtomicInteger(0); // 用于生成唯一产品 IDExecutorService executorService = Executors.newCachedThreadPool(); // 使用线程池管理线程System.out.println("Starting simulation...");// 启动消费者for (int i = 0; i < numConsumers; i++) {executorService.submit(new Consumer(queue));}// 启动生产者// 每个生产者需要知道要为多少个消费者放入毒丸int poisonPillsPerProducer = (int) Math.ceil((double) numConsumers / numProducers); // 稍微复杂点的毒丸分配,确保总数正确int remainingPills = numConsumers;for (int i = 0; i < numProducers; i++) {int pillsForThisProducer = (i == numProducers - 1) ? remainingPills : poisonPillsPerProducer;executorService.submit(new Producer(queue, itemsPerProducer, pillsForThisProducer, idCounter));remainingPills -= pillsForThisProducer;}// 关闭 ExecutorService,不再接受新任务executorService.shutdown();System.out.println("ExecutorService shutdown initiated. Waiting for tasks to complete...");// 等待所有任务完成 (或者超时)if (executorService.awaitTermination(1, TimeUnit.MINUTES)) {System.out.println("All producers and consumers finished.");} else {System.err.println("Simulation timed out.");executorService.shutdownNow(); // 尝试强制停止}}
}
代码亮点:
- 简洁性:使用
BlockingQueue
后,生产者和消费者的核心逻辑非常简单,就是put()
和take()
,无需手动处理锁和条件变量。 - 自动阻塞:队列满或空时的阻塞行为由
BlockingQueue
自动处理。 - 毒丸机制:演示了使用特殊对象(毒丸)来优雅地停止消费者线程的一种常用方法。生产者负责放入足够数量的毒丸。
- 线程池管理:使用
ExecutorService
来管理线程生命周期,比手动创建和join
线程更方便。
6.3 对比:使用非阻塞队列实现生产者-消费者
为了凸显 BlockingQueue
的价值,我们可以尝试用一个非阻塞队列(如 ConcurrentLinkedQueue
)加上手动的锁和条件变量来实现相同的生产者-消费者模式。这将清晰地展示出 BlockingQueue
为我们省去了多少麻烦。
ConcurrentLinkedQueue
是一个线程安全的、基于链表的无界非阻塞队列。它的 offer()
和 poll()
操作是非阻塞的。我们需要手动添加阻塞逻辑。
import java.util.LinkedList; // 使用 LinkedList 作为底层队列 (非线程安全)
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ProducerConsumerWithNonBlockingQueue {private static final int POISON_PILL = -1;// 共享资源,封装队列和同步机制static class SharedResource {private final Queue<Integer> queue; // 使用 LinkedList (需要外部同步)private final int capacity;private final Lock lock = new ReentrantLock(); // 锁private final Condition notFull = lock.newCondition(); // 队列不满条件private final Condition notEmpty = lock.newCondition(); // 队列不空条件SharedResource(int capacity) {this.queue = new LinkedList<>(); // 注意:这里用 LinkedList 演示手动同步// 如果用 ConcurrentLinkedQueue,offer/poll 本身线程安全,但仍需 Conditionthis.capacity = capacity;}// 手动实现的 put (阻塞)public void put(Integer data) throws InterruptedException {lock.lockInterruptibly(); // 获取锁try {while (queue.size() >= capacity) { // 条件:队列满System.out.println(Thread.currentThread().getName() + " Queue is full. Waiting...");notFull.await(); // 在 notFull 上等待}queue.offer(data); // 入队 (因为持有锁,LinkedList 操作是安全的)// System.out.println(Thread.currentThread().getName() + " Added " + data + ", notifying consumers.");notEmpty.signalAll(); // 唤醒所有等待的消费者 (用 signalAll 更安全)} finally {lock.unlock(); // 释放锁}}// 手动实现的 take (阻塞)public Integer take() throws InterruptedException {lock.lockInterruptibly(); // 获取锁try {while (queue.isEmpty()) { // 条件:队列空System.out.println(Thread.currentThread().getName() + " Queue is empty. Waiting...");notEmpty.await(); // 在 notEmpty 上等待}Integer data = queue.poll(); // 出队// System.out.println(Thread.currentThread().getName() + " Took " + data + ", notifying producers.");notFull.signalAll(); // 唤醒所有等待的生产者return data;} finally {lock.unlock(); // 释放锁}}}// 生产者 Runnable (使用 SharedResource)static class Producer implements Runnable {private final SharedResource resource;// ... (其他成员变量同上一个例子) ...Producer(SharedResource resource, int productionCount, int poisonPillCount, AtomicInteger idCounter) {this.resource = resource;// ... 初始化其他成员 ...}@Overridepublic void run() {try {for (int i = 0; i < 15 /*productionCount*/; i++) { // 简化数量int data = 100 + i; // 简化数据生成System.out.println(Thread.currentThread().getName() + " Producing: " + data);resource.put(data); // 调用手动实现的 putTimeUnit.MILLISECONDS.sleep(100);}System.out.println(Thread.currentThread().getName() + " Finished producing. Putting poison pills...");for (int i = 0; i < 3 /*poisonPillCount*/; i++) { // 简化毒丸数量resource.put(POISON_PILL);}System.out.println(Thread.currentThread().getName() + " Put poison pills. Exiting.");} catch (InterruptedException e) { /* ... */ }}}// 消费者 Runnable (使用 SharedResource)static class Consumer implements Runnable {private final SharedResource resource;Consumer(SharedResource resource) { this.resource = resource; }@Overridepublic void run() {try {while (true) {Integer data = resource.take(); // 调用手动实现的 takeif (data == POISON_PILL) {System.out.println(Thread.currentThread().getName() + " Received poison pill. Exiting.");// 需要把毒丸放回去,让其他消费者也能收到!resource.put(POISON_PILL); break;}System.out.println(Thread.currentThread().getName() + " Consuming: " + data);TimeUnit.MILLISECONDS.sleep(200);}} catch (InterruptedException e) { /* ... */ }}}// Main 方法类似,只是创建 SharedResource 和对应的 Producer/Consumerpublic static void main(String[] args) throws InterruptedException {int queueCapacity = 5;int numProducers = 1;int numConsumers = 3;SharedResource resource = new SharedResource(queueCapacity);ExecutorService executorService = Executors.newCachedThreadPool();AtomicInteger idCounter = new AtomicInteger(0); // 未使用,简化System.out.println("Starting Non-Blocking Queue simulation...");for (int i = 0; i < numConsumers; i++) {executorService.submit(new Consumer(resource));}for (int i = 0; i < numProducers; i++) {// 生产者需要放入 numConsumers 个毒丸executorService.submit(new Producer(resource, 15, numConsumers, idCounter));}executorService.shutdown();executorService.awaitTermination(1, TimeUnit.MINUTES);System.out.println("Non-Blocking Queue simulation finished.");}
}
对比分析:
- 代码量增加:我们需要手动创建
SharedResource
类,管理Lock
和Condition
,并实现put
和take
的阻塞逻辑(while
循环、await
/signalAll
)。 - 易出错性增加:手动管理锁和条件变量非常容易出错。例如,忘记在
finally
中unlock()
,await()
前的条件判断错误,signal
/signalAll
调用时机错误或遗漏,都可能导致死锁、线程饥饿或数据不一致。 - 毒丸处理更复杂:在使用非阻塞队列手动实现时,当一个消费者拿到毒丸后,通常需要再把毒丸放回队列,以确保其他消费者也能收到并退出。而
BlockingQueue
的实现通常能更好地处理多个消费者对应毒丸的情况(如ArrayBlockingQueue
的signal
只唤醒一个,或者生产者放入足够的毒丸)。 - 性能考量:虽然
ConcurrentLinkedQueue
本身的offer
/poll
非常快(基于 CAS),但我们手动添加的Lock
和Condition
会引入额外的开销和锁竞争。相比之下,BlockingQueue
的原生实现(特别是LinkedBlockingQueue
,LinkedTransferQueue
)内部可能做了更多优化。
结论:BlockingQueue
极大地简化了生产者-消费者模式以及其他需要线程安全队列和阻塞功能的场景的实现,提高了代码的可读性、可维护性和健壮性,是 JUC 包提供的强大武器。
7. 高级话题与注意事项
7.1 公平性 (Fairness)
ArrayBlockingQueue
和SynchronousQueue
可以选择公平模式。- 公平模式下,锁的获取和条件的等待遵循 FIFO 原则,可以防止线程饥饿。
- 代价:公平性通常会牺牲吞吐量。因为需要维护等待队列,并且严格按顺序唤醒,上下文切换可能更频繁。
- 选择:大多数情况下,非公平锁性能更好,也足够用。只有在确实需要防止饥饿的场景下才考虑使用公平锁。
7.2 内存占用与 OOM 风险
ArrayBlockingQueue
:内存占用固定,创建时分配。优点是内存可预测,缺点是可能浪费空间或容量不足。LinkedBlockingQueue
(及其他基于链表的):内存按需分配。优点是灵活,缺点是:- 每个节点有额外开销(对象头、
next
指针)。 - 无界队列 (默认
LinkedBlockingQueue
,PriorityBlockingQueue
,DelayQueue
,LinkedTransferQueue
) 存在 OOM (OutOfMemoryError) 风险。如果生产者速度远快于消费者,队列会无限增长,最终耗尽堆内存。必须谨慎使用无界队列,确保消费者能力足够,或者有其他流量控制/背压机制。
- 每个节点有额外开销(对象头、
7.3 中断处理 (InterruptedException
)
-
BlockingQueue
的阻塞方法 (put
,take
, 带超时的offer
/poll
) 都是响应中断的。如果在这些方法阻塞等待时,线程的interrupt()
方法被调用,阻塞方法会抛出InterruptedException
。 -
最佳实践:捕获
InterruptedException
后,通常应该:- 执行必要的清理工作。
- 重新设置中断状态:调用
Thread.currentThread().interrupt()
。这很重要,因为捕获异常会清除中断状态,重新设置可以让更高层的调用者知道发生了中断。 - 可以选择向上抛出异常,或者根据业务逻辑结束当前任务/线程。
try {queue.put(data); } catch (InterruptedException e) {// 清理...Thread.currentThread().interrupt(); // 重新设置中断状态// 结束任务或向上抛出...System.err.println("Task interrupted during put.");return; // or throw new RuntimeException(e); }
7.4 BlockingQueue
与 ThreadPoolExecutor
ThreadPoolExecutor
是 Java 中创建自定义线程池的核心类,它就使用 BlockingQueue
来存储待执行的任务 (Runnable
或 Callable
)。
public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 非核心线程空闲存活时间TimeUnit unit, // 时间单位BlockingQueue<Runnable> workQueue, // 工作队列ThreadFactory threadFactory, // 线程工厂RejectedExecutionHandler handler) // 拒绝策略
选择不同的 workQueue
对线程池的行为有显著影响:
LinkedBlockingQueue
(无界):当所有核心线程都在忙时,新任务会无限加入队列。这会导致maximumPoolSize
参数失效(线程数永远不会超过corePoolSize
),并且有 OOM 风险。Executors.newFixedThreadPool()
使用这种配置。ArrayBlockingQueue
(有界):当核心线程忙且队列满时,线程池会尝试创建新的线程(直到maximumPoolSize
),如果连最大线程数都满了,则执行拒绝策略。这种配置更可控,能防止资源耗尽。SynchronousQueue
(零容量):当所有核心线程都在忙时,新任务提交会立即尝试创建新线程(直到maximumPoolSize
)。如果达到最大线程数,则执行拒绝策略。这使得线程池非常灵活,能快速响应突发任务(创建新线程),并在空闲时回收线程(配合keepAliveTime
)。Executors.newCachedThreadPool()
使用这种配置。PriorityBlockingQueue
(无界):任务会按优先级执行。同样有 OOM 风险。
理解 BlockingQueue
的特性对于正确配置和使用 ThreadPoolExecutor
至关重要。
8. 总结
BlockingQueue
优雅解决了多线程环境下生产者-消费者模式中的核心挑战:线程安全、状态同步和线程通信。
通过将复杂的并发控制逻辑封装在队列内部,BlockingQueue
提供了简洁易用的接口,显著降低了并发编程的难度。
回顾:
BlockingQueue
的核心概念:阻塞特性、线程安全、多种操作策略。- 四组核心方法的区别与适用场景:抛异常、返回特殊值、阻塞、超时阻塞。
- 七种常见实现的内部原理、特性、优缺点和适用场景:
ArrayBlockingQueue
(有界数组,单锁)LinkedBlockingQueue
(可选有界链表,双锁,常用)PriorityBlockingQueue
(无界优先级堆,单锁)DelayQueue
(无界延迟队列,基于 PriorityQueue)SynchronousQueue
(零容量,直接传递)LinkedTransferQueue
(无界链表,transfer 语义,高性能)LinkedBlockingDeque
(双端队列,用于工作窃取)
- 底层实现原理:
ReentrantLock
和Condition
如何协同工作实现阻塞与唤醒。 - 选择指南:如何根据需求(有界/无界、性能、公平性、功能)选择合适的实现。
- 实战应用:批处理组件、经典生产者-消费者模式,以及与非阻塞队列实现的对比。
- 高级话题:公平性、内存风险、中断处理、与线程池的关系。
Happy coding!