Java并发-栅栏CyclicBarrier
话说北京有个地儿叫大栅栏,在前门前,天桥下,过了天桥就到了天坛。这个大栅栏标准读音是da zha lan
,但如果你非得这么念可能要遭到本地人笑话的,正确的土话读法是:da she lan er
。
言归正传,今天讲到的工具叫CyclicBarrier
,直译过来就叫做栅栏。栅栏其实就是口袋,玉米装满口袋就需要扎口。
# 一、简介
CyclicBarrier
是 Java 并发包中的一个同步辅助类,它允许一组线程相互等待,直到所有线程都到达某个公共屏障点(barrier point)。在涉及固定大小的线程组必须相互等待的程序中,CyclicBarrier
非常有用。之所以叫 Cyclic(循环),是因为在释放等待线程后,它可以被重用。
# 1、核心特性
- 屏障点同步:所有参与线程必须到达屏障点才能继续执行
- 可循环使用:屏障释放后可以重置并再次使用
- 屏障动作:可以在所有线程到达屏障时执行一个预定义的动作
- BrokenBarrierException:如果任何线程在等待期间被中断或超时,所有等待线程都会收到此异常
# 二、基本使用示例
下面通过一个扎口袋的例子初步认识一下CyclicBarrier
:
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CyclicBarrierDemo {
// 创建一个容量为10的CyclicBarrier,当10个线程都到达屏障点时执行扎口袋动作
final static CyclicBarrier BARRIER = new CyclicBarrier(10, () -> {
System.out.println("口袋装满了,开始扎口袋!");
});
static class Maize implements Runnable {
final static AtomicInteger counter = new AtomicInteger();
@Override
public void run() {
try {
int maizeNo = counter.getAndIncrement();
System.out.println(Thread.currentThread().getName() + ":装" + maizeNo + "号玉米进口袋");
// 模拟装玉米的时间
Thread.sleep(new Random().nextInt(10) * 100);
System.out.println(Thread.currentThread().getName() + ":" + maizeNo + "号玉米装好了,等待其他玉米...");
// 等待其他线程到达屏障点
BARRIER.await();
System.out.println(Thread.currentThread().getName() + ":扎口袋完成,继续下一轮");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(new Maize());
}
executorService.shutdown();
}
}
# 1、工作原理
CyclicBarrier
的工作流程是:
- 每个线程执行自己的任务(装玉米)
- 调用
await()
方法,表示已到达屏障点 - 线程在屏障点等待,直到所有线程都到达
- 当最后一个线程到达时,执行屏障动作(扎口袋)
- 所有等待的线程被释放,继续执行后续代码
# 三、的主要方法
# 1、构造方法
// 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动
public CyclicBarrier(int parties)
// 创建一个新的 CyclicBarrier,并在所有线程到达屏障时执行给定的屏障操作
public CyclicBarrier(int parties, Runnable barrierAction)
# 2、核心方法
// 在屏障处等待,直到所有参与者都调用了此方法
public int await() throws InterruptedException, BrokenBarrierException
// 在屏障处等待,直到所有参与者都调用了此方法,或者超时
public int await(long timeout, TimeUnit unit)
throws InterruptedException, BrokenBarrierException, TimeoutException
// 将屏障重置为其初始状态
public void reset()
// 查询是否处于破损状态
public boolean isBroken()
// 返回当前在屏障处等待的参与者数量
public int getNumberWaiting()
// 返回要求启动此 barrier 的参与者数量
public int getParties()
# 四、vs CountDownLatch
虽然 CyclicBarrier
和 CountDownLatch
看起来相似,但它们有显著的区别:
特性 | CyclicBarrier | CountDownLatch |
---|---|---|
重用性 | 可以重复使用(reset) | 一次性使用,计数到0后不能重置 |
等待机制 | 所有线程互相等待 | 一个或多个线程等待其他线程 |
触发动作 | 可以在所有线程到达时执行特定动作 | 没有此机制 |
参与方式 | 调用 await() 的线程会阻塞 | 调用 countDown() 的线程不阻塞 |
适用场景 | 多线程计算,分治算法 | 主线程等待多个工作线程完成 |
异常处理 | 一个线程中断会影响所有等待线程 | 不会相互影响 |
# 五、循环使用示例
CyclicBarrier
最大的特点是可以循环使用:
public class CyclicReuseDemo {
public static void main(String[] args) throws InterruptedException {
int rounds = 3; // 进行3轮比赛
int players = 4; // 4个选手
CyclicBarrier barrier = new CyclicBarrier(players, () -> {
System.out.println("====== 所有选手已就位,开始新一轮比赛!======");
});
ExecutorService executor = Executors.newFixedThreadPool(players);
for (int round = 1; round <= rounds; round++) {
System.out.println("\n第" + round + "轮比赛准备中...");
for (int i = 1; i <= players; i++) {
final int playerNo = i;
final int currentRound = round;
executor.execute(() -> {
try {
Thread.sleep(new Random().nextInt(2000));
System.out.println("第" + currentRound + "轮:选手" + playerNo + "已准备就绪");
barrier.await();
System.out.println("第" + currentRound + "轮:选手" + playerNo + "开始比赛!");
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(3000); // 等待本轮结束
}
executor.shutdown();
}
}
# 六、实际应用场景
# 1、并行计算任务
public class ParallelComputing {
public static void main(String[] args) {
int threadCount = 4;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有子任务计算完成,开始汇总结果...");
});
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
// 执行计算任务
System.out.println("任务" + taskId + "开始计算...");
Thread.sleep(new Random().nextInt(3000));
System.out.println("任务" + taskId + "计算完成");
// 等待其他任务
barrier.await();
// 继续后续处理
System.out.println("任务" + taskId + "继续处理汇总后的数据");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
# 2、多线程分步骤任务
public class MultiStepTask {
public static void main(String[] args) {
int workerCount = 3;
// 第一步:数据准备
CyclicBarrier prepareBarrier = new CyclicBarrier(workerCount, () -> {
System.out.println("=== 所有数据准备完成 ===");
});
// 第二步:数据处理
CyclicBarrier processBarrier = new CyclicBarrier(workerCount, () -> {
System.out.println("=== 所有数据处理完成 ===");
});
// 第三步:结果输出
CyclicBarrier outputBarrier = new CyclicBarrier(workerCount, () -> {
System.out.println("=== 所有结果输出完成 ===");
});
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
new Thread(() -> {
try {
// 步骤1:准备数据
System.out.println("Worker-" + workerId + " 准备数据中...");
Thread.sleep(new Random().nextInt(2000));
prepareBarrier.await();
// 步骤2:处理数据
System.out.println("Worker-" + workerId + " 处理数据中...");
Thread.sleep(new Random().nextInt(2000));
processBarrier.await();
// 步骤3:输出结果
System.out.println("Worker-" + workerId + " 输出结果中...");
Thread.sleep(new Random().nextInt(2000));
outputBarrier.await();
System.out.println("Worker-" + workerId + " 全部任务完成!");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
# 七、异常处理
使用 CyclicBarrier
时需要注意异常处理:
public class BarrierExceptionDemo {
public static void main(String[] args) throws InterruptedException {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties);
// 正常线程
Thread t1 = new Thread(() -> {
try {
System.out.println("线程1等待中...");
barrier.await();
System.out.println("线程1继续执行");
} catch (BrokenBarrierException e) {
System.out.println("线程1:屏障被破坏!");
} catch (InterruptedException e) {
System.out.println("线程1被中断");
}
});
// 会超时的线程
Thread t2 = new Thread(() -> {
try {
System.out.println("线程2等待中(设置超时)...");
barrier.await(2, TimeUnit.SECONDS);
System.out.println("线程2继续执行");
} catch (TimeoutException e) {
System.out.println("线程2:等待超时!");
} catch (BrokenBarrierException e) {
System.out.println("线程2:屏障被破坏!");
} catch (InterruptedException e) {
System.out.println("线程2被中断");
}
});
t1.start();
t2.start();
// 等待一段时间后,第三个线程尝试使用已破损的屏障
Thread.sleep(3000);
Thread t3 = new Thread(() -> {
try {
System.out.println("线程3尝试使用屏障...");
System.out.println("屏障是否破损:" + barrier.isBroken());
barrier.await();
} catch (BrokenBarrierException e) {
System.out.println("线程3:屏障已经破损,无法使用!");
} catch (InterruptedException e) {
System.out.println("线程3被中断");
}
});
t3.start();
}
}
# 八、注意事项
- 屏障破损:如果有线程在等待时被中断或超时,屏障会被破损,所有等待的线程都会抛出
BrokenBarrierException
- 死锁风险:如果线程数少于屏障设定的数量,会导致永久等待
- 重用时机:只有在所有线程都通过屏障后,才能安全地重用屏障
- 性能考虑:大量线程等待时会消耗系统资源,需要合理设置线程数
# 九、总结
CyclicBarrier
是一个强大的同步工具,特别适合以下场景:
- 需要多个线程相互等待到达同一个同步点
- 需要在所有线程到达后执行某个特定操作
- 需要重复使用同步屏障的场景
- 多线程分阶段执行任务
相比 CountDownLatch
,CyclicBarrier
提供了更灵活的循环使用能力和屏障动作机制,但使用时也需要更加注意异常处理和死锁风险。选择使用哪个工具,应该根据具体的业务场景和需求来决定。
编辑 (opens new window)
上次更新: 2025/08/14