轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Java

    • 核心

    • 并发

      • Java并发-线程基础与synchronized关键字
      • Java并发-重入锁ReentrantLock详解与实践
      • Java并发-信号量Semaphore
      • Java并发-读写锁ReadWriteLock
      • Java并发-倒计时器CountDownLatch
      • Java并发-栅栏CyclicBarrier
      • Java并发-LockSupport线程阻塞工具类
      • Java并发-线程池ThreadPoolExecutor
      • Java并发-阻塞队列BlockingQueue
        • 一、什么是阻塞队列
          • 1、生产者-消费者模式
        • 二、核心方法对比
          • 1、方法详解
        • 三、BlockingQueue的实现类
          • 1、常用实现类详解
          • 1.1、ArrayBlockingQueue
          • 1.2、LinkedBlockingQueue
          • 1.3、PriorityBlockingQueue
          • 1.4、DelayQueue
          • 1.5、SynchronousQueue
          • 1.6、LinkedTransferQueue
        • 四、完整示例:生产者-消费者模式
        • 五、实现原理探究
          • 1、核心组件
          • 2、阻塞机制原理
          • 3、LinkedBlockingQueue的双锁设计
        • 六、性能优化与最佳实践
          • 1、选择合适的队列实现
          • 2、性能调优建议
          • 3、常见问题与解决方案
          • 3.1、内存泄漏问题
          • 3.2、中断处理
          • 3.3、资源关闭
        • 七、高性能替代方案
          • 1、ConcurrentLinkedQueue
          • 2、Disruptor
          • 3、JCTools
        • 八、总结
        • 九、参考资料
      • Java并发-以空间换时间之ThreadLocal
      • Java并发-无锁策略CAS与atomic包
      • Java并发-JDK并发容器
      • Java并发-异步调用结果之Future和CompletableFuture
      • Java并发-Fork Join框架
      • Java并发-调试与诊断
    • 经验

    • JVM

    • 企业应用

  • Spring

  • 其他语言

  • 工具

  • 后端
  • Java
  • 并发
轩辕李
2018-07-01
目录

Java并发-阻塞队列BlockingQueue

BlockingQueue是Java并发包(java.util.concurrent)中的核心接口之一,在并发编程中扮演着重要角色。它继承自Queue接口,提供了线程安全的队列操作,特别适用于生产者-消费者模式。本文将深入探讨BlockingQueue的设计原理、使用场景、性能优化和最佳实践。

# 一、什么是阻塞队列

BlockingQueue顾名思义是一个支持阻塞操作的队列。它具有以下特点:

  • 线程安全:所有方法都是线程安全的,内部使用锁或其他并发控制机制
  • 阻塞操作:当队列为空时,获取操作会阻塞;当队列满时,插入操作会阻塞
  • 可选容量限制:可以创建有界或无界队列

# 1、生产者-消费者模式

以工厂流水线为例:

  • 生产者(Producer):负责生产数据并放入队列,如流水线上生产零件的工人A
  • 消费者(Consumer):从队列中取出数据进行处理,如组装零件的工人B

当生产速度和消费速度不匹配时:

  • 生产过快:队列满,生产者阻塞等待
  • 消费过快:队列空,消费者阻塞等待

# 二、核心方法对比

BlockingQueue提供了多组方法来处理不同场景:

操作 抛出异常 返回特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() - -

# 1、方法详解

插入操作:

  • put(e):将元素插入队列,如果队列满则阻塞等待
  • offer(e):尝试插入元素,成功返回true,队列满返回false,不阻塞
  • offer(e, time, unit):限时等待插入,超时返回false
  • add(e):插入元素,队列满时抛出IllegalStateException

移除操作:

  • take():获取并移除队首元素,队列空则阻塞等待
  • poll():尝试获取并移除队首元素,队列空返回null,不阻塞
  • poll(time, unit):限时等待获取,超时返回null
  • remove():移除队首元素,队列空时抛出NoSuchElementException

# 三、BlockingQueue的实现类

BlockingQueue是一个接口,Java提供了多种实现类,每种都有其特点和适用场景:

image

# 1、常用实现类详解

# 1.1、ArrayBlockingQueue

  • 底层结构:基于数组的有界阻塞队列
  • 特点:容量固定,公平性可选(默认非公平)
  • 适用场景:明确知道容量上限的场景
// 创建容量为10的队列
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 公平模式:线程按FIFO顺序访问队列
ArrayBlockingQueue<String> fairQueue = new ArrayBlockingQueue<>(10, true);

// 基本使用
queue.put("abc");  // 生产者线程
String item = queue.take();  // 消费者线程

# 1.2、LinkedBlockingQueue

  • 底层结构:基于链表的可选有界阻塞队列
  • 特点:默认容量Integer.MAX_VALUE(近似无界),吞吐量通常高于ArrayBlockingQueue
  • 适用场景:生产者消费者速度差异较大的场景
// 无界队列(实际上限为Integer.MAX_VALUE)
LinkedBlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>();

// 有界队列
LinkedBlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);

# 1.3、PriorityBlockingQueue

  • 底层结构:基于优先级堆的无界阻塞队列
  • 特点:元素按优先级排序,不保证同优先级元素的顺序
  • 适用场景:需要按优先级处理任务的场景
// 自定义比较器
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(10, 
    (t1, t2) -> t1.getPriority() - t2.getPriority());

# 1.4、DelayQueue

  • 底层结构:基于PriorityQueue的无界阻塞队列
  • 特点:元素只有在延迟期满后才能被获取
  • 适用场景:缓存过期、定时任务调度
class DelayedTask implements Delayed {
    private final long delayTime;
    private final long expire;
    
    public DelayedTask(long delay) {
        this.delayTime = delay;
        this.expire = System.currentTimeMillis() + delay;
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    
    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), 
                           o.getDelay(TimeUnit.MILLISECONDS));
    }
}

# 1.5、SynchronousQueue

  • 底层结构:不存储元素的阻塞队列
  • 特点:每个插入操作必须等待一个对应的移除操作
  • 适用场景:传递性场景,线程之间直接传递数据
SynchronousQueue<String> queue = new SynchronousQueue<>();
// 生产者线程
new Thread(() -> {
    try {
        queue.put("data"); // 阻塞直到有消费者接收
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

// 消费者线程
new Thread(() -> {
    try {
        String data = queue.take(); // 阻塞直到有生产者提供数据
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

# 1.6、LinkedTransferQueue

  • 底层结构:基于链表的无界阻塞队列
  • 特点:结合了LinkedBlockingQueue和SynchronousQueue的功能
  • 适用场景:需要更高性能的传递场景

# 四、完整示例:生产者-消费者模式

public class ProducerConsumerExample {
    private static final int QUEUE_CAPACITY = 5;
    private static final BlockingQueue<Integer> queue = 
        new ArrayBlockingQueue<>(QUEUE_CAPACITY);
    
    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.println("生产: " + i);
                    queue.put(i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    Integer value = queue.take();
                    System.out.println("消费: " + value);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) {
        Thread producer = new Thread(new Producer());
        Thread consumer = new Thread(new Consumer());
        
        producer.start();
        consumer.start();
    }
}

# 五、实现原理探究

如果你对BlockingQueue的实现原理感兴趣,通过查看ArrayBlockingQueue源码可以发现其核心机制:

# 1、核心组件

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    /** 存储元素的数组 */
    final Object[] items;
    
    /** 下一个take、poll、peek或remove操作的索引 */
    int takeIndex;
    
    /** 下一个put、offer或add操作的索引 */
    int putIndex;
    
    /** 队列中元素的数量 */
    int count;
    
    /** 主锁,保护所有访问 */
    final ReentrantLock lock;
    
    /** 等待take的条件 */
    private final Condition notEmpty;
    
    /** 等待put的条件 */
    private final Condition notFull;
}

# 2、阻塞机制原理

ArrayBlockingQueue使用ReentrantLock和Condition实现阻塞:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();  // 队列满,等待
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();  // 队列空,等待
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();  // 通知等待的消费者
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    notFull.signal();  // 通知等待的生产者
    return x;
}

# 3、LinkedBlockingQueue的双锁设计

与ArrayBlockingQueue的单锁不同,LinkedBlockingQueue采用双锁设计以提高并发性能:

/** 用于put、offer等操作的锁 */
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

/** 用于take、poll等操作的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();

这种设计允许生产者和消费者同时操作队列的不同端,提高了吞吐量。

# 六、性能优化与最佳实践

# 1、选择合适的队列实现

队列类型 适用场景 性能特点
ArrayBlockingQueue 容量固定,公平性要求 内存连续,缓存友好
LinkedBlockingQueue 容量可变,高吞吐量 双锁设计,并发性好
SynchronousQueue 直接传递,不缓存 延迟低,无容量
PriorityBlockingQueue 任务调度,优先级处理 堆操作,O(log n)

# 2、性能调优建议

  1. 容量设置

    • 根据内存和业务需求合理设置队列容量
    • 避免过大容量导致内存浪费
    • 避免过小容量导致频繁阻塞
  2. 公平性选择

    • 非公平模式(默认):吞吐量高,可能出现线程饥饿
    • 公平模式:保证FIFO,吞吐量较低
  3. 批量操作

    // 使用drainTo批量获取,减少锁竞争
    List<String> buffer = new ArrayList<>();
    int count = queue.drainTo(buffer, 10);
    
  4. 超时控制

    // 避免无限等待
    if (queue.offer(item, 1, TimeUnit.SECONDS)) {
        // 成功
    } else {
        // 处理超时
    }
    

# 3、常见问题与解决方案

# 3.1、内存泄漏问题

// 错误:无界队列可能导致OOM
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>();

// 正确:设置合理的容量上限
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(10000);

# 3.2、中断处理

try {
    queue.take();
} catch (InterruptedException e) {
    // 恢复中断状态
    Thread.currentThread().interrupt();
    // 清理资源
    return;
}

# 3.3、资源关闭

public class QueueService implements AutoCloseable {
    private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(100);
    private volatile boolean shutdown = false;
    
    @Override
    public void close() {
        shutdown = true;
        // 清空队列
        queue.clear();
    }
}

# 七、高性能替代方案

对于极高性能要求的场景,BlockingQueue的锁机制可能成为瓶颈。可以考虑以下替代方案:

# 1、ConcurrentLinkedQueue

  • 非阻塞队列,基于CAS操作
  • 适用于生产者消费者速度相近的场景
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("data");
String data = queue.poll(); // 不阻塞,可能返回null

# 2、Disruptor

  • LMAX开发的高性能队列框架
  • 无锁设计,性能极高
  • 适用于低延迟、高吞吐量场景
  • 详细介绍:高性能队列——Disruptor (opens new window)

# 3、JCTools

  • 提供多种高性能并发数据结构
  • 针对不同场景优化的队列实现
// SPSC: 单生产者单消费者
SpscArrayQueue<String> spscQueue = new SpscArrayQueue<>(1024);

// MPSC: 多生产者单消费者
MpscArrayQueue<String> mpscQueue = new MpscArrayQueue<>(1024);

# 八、总结

BlockingQueue是Java并发编程的重要工具,通过阻塞机制简化了生产者-消费者模式的实现。选择合适的实现类、理解其内部原理、遵循最佳实践,能够帮助我们构建高效、可靠的并发程序。在极端性能要求下,可以考虑无锁队列等替代方案。

# 九、参考资料

  • Java官方文档 - BlockingQueue (opens new window)
  • 高性能队列——Disruptor (opens new window)
  • JCTools GitHub (opens new window)
编辑 (opens new window)
#BlockingQueue
上次更新: 2025/08/15
Java并发-线程池ThreadPoolExecutor
Java并发-以空间换时间之ThreadLocal

← Java并发-线程池ThreadPoolExecutor Java并发-以空间换时间之ThreadLocal→

最近更新
01
AI时代的编程心得
09-11
02
Claude Code与Codex的协同工作
09-01
03
Claude Code实战之供应商切换工具
08-18
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式