Java并发-作用域值与结构化并发
Java 21引入了虚拟线程(Virtual Threads),彻底改变了Java的并发编程模型。为了更好地配合虚拟线程,Java还引入了两个重要的并发特性:作用域值(Scoped Values) 和 结构化并发(Structured Concurrency)。
作用域值在Java 25中正式发布(JEP 506),而结构化并发目前仍处于预览阶段(JEP 505)。这两个特性相辅相成,共同构建了现代Java并发编程的新范式。
# 一、为什么需要新的并发特性?
# 1、ThreadLocal的问题
在传统的并发编程中,我们经常使用ThreadLocal来在线程间传递上下文数据。但ThreadLocal存在几个固有的设计缺陷:
无约束的可变性:任何能调用get()方法的代码都能调用set()方法,导致数据流难以追踪。
无界的生命周期:一旦设置了值,它会一直保留到线程结束或显式调用remove()。开发者经常忘记调用remove(),导致内存泄漏。
昂贵的继承开销:当使用InheritableThreadLocal时,子线程需要复制父线程的所有线程局部变量,在大量虚拟线程场景下会造成显著的内存开销。
# 2、非结构化并发的问题
使用ExecutorService进行并发编程时,任务之间的关系是松散的:
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = executor.submit(() -> findUser());
Future<Integer> order = executor.submit(() -> fetchOrder());
String theUser = user.get(); // 阻塞等待
int theOrder = order.get(); // 阻塞等待
return new Response(theUser, theOrder);
}
这种方式存在以下问题:
- 如果
findUser()抛出异常,fetchOrder()仍会继续运行,造成线程泄漏 - 如果主线程被中断,子任务不会自动取消
- 如果
fetchOrder()失败,主线程仍会等待findUser()完成
# 二、作用域值(Scoped Values)
作用域值是ThreadLocal的现代替代方案,专为虚拟线程和结构化并发设计。
# 1、基本用法
// 声明作用域值(通常是static final)
private static final ScopedValue<String> USER = ScopedValue.newInstance();
// 绑定并运行
public void processRequest(String username) {
ScopedValue.where(USER, username).run(() -> {
// 在这个作用域内,USER绑定到username
handleRequest();
});
}
// 在调用链的任何位置读取
private void handleRequest() {
String currentUser = USER.get(); // 获取绑定的值
System.out.println("处理用户请求: " + currentUser);
doSomething();
}
private void doSomething() {
// 深层调用也能访问
String user = USER.get();
System.out.println("当前用户: " + user);
}
# 2、与ThreadLocal的对比
| 特性 | ThreadLocal | ScopedValue |
|---|---|---|
| 可变性 | 可随时修改 | 不可变(单次绑定) |
| 生命周期 | 无界(直到线程结束) | 有界(作用域结束自动清理) |
| 继承方式 | 复制(开销大) | 共享(零开销) |
| 数据流向 | 双向 | 单向(调用者→被调用者) |
| 性能 | 较慢 | 接近本地变量访问速度 |
# 3、重新绑定
在调用链中,可以为后续调用重新绑定作用域值:
private static final ScopedValue<String> CONTEXT = ScopedValue.newInstance();
void foo() {
ScopedValue.where(CONTEXT, "hello").run(() -> bar());
}
void bar() {
System.out.println(CONTEXT.get()); // 输出: hello
// 为后续调用重新绑定
ScopedValue.where(CONTEXT, "goodbye").run(() -> baz());
System.out.println(CONTEXT.get()); // 输出: hello(恢复原值)
}
void baz() {
System.out.println(CONTEXT.get()); // 输出: goodbye
}
# 4、多值绑定
可以同时绑定多个作用域值:
private static final ScopedValue<String> USER = ScopedValue.newInstance();
private static final ScopedValue<String> TENANT = ScopedValue.newInstance();
void process() {
ScopedValue.where(USER, "alice")
.where(TENANT, "company-a")
.run(() -> {
// 两个值都可用
System.out.println("用户: " + USER.get());
System.out.println("租户: " + TENANT.get());
});
}
# 5、返回值和异常处理
使用call()方法可以返回值或抛出异常:
private static final ScopedValue<Connection> DB_CONN = ScopedValue.newInstance();
String queryUser(Connection conn, String userId) throws Exception {
return ScopedValue.where(DB_CONN, conn).call(() -> {
return executeQuery(userId);
});
}
private String executeQuery(String userId) throws SQLException {
Connection conn = DB_CONN.get();
// 执行查询...
return "user-data";
}
# 6、检查绑定状态
if (USER.isBound()) {
String user = USER.get();
// 处理...
} else {
// 未绑定时的处理
}
// 或使用orElse提供默认值
String user = USER.orElse("anonymous");
# 三、结构化并发(Structured Concurrency)
结构化并发将并发任务组织成清晰的层次结构,使得任务的生命周期与代码的词法作用域保持一致。
# 1、基本用法
Response handle() throws InterruptedException {
try (var scope = StructuredTaskScope.open()) {
// 分叉子任务
Subtask<String> user = scope.fork(() -> findUser());
Subtask<Integer> order = scope.fork(() -> fetchOrder());
// 等待所有子任务完成
scope.join();
// 处理结果
return new Response(user.get(), order.get());
}
}
# 2、核心优势
错误处理与短路:如果任一子任务失败,其他子任务会被自动取消。
取消传播:如果主线程被中断,所有子任务会自动取消。
清晰的结构:任务的生命周期由代码块明确界定。
可观测性:线程转储可以清晰显示任务层次结构。
# 3、Joiner策略
StructuredTaskScope支持不同的完成策略:
默认策略(任一失败则失败):
try (var scope = StructuredTaskScope.open()) {
scope.fork(() -> task1());
scope.fork(() -> task2());
scope.join(); // 任一失败则抛出异常
}
任一成功即返回:
<T> T race(Collection<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<T>anySuccessfulResultOrThrow())) {
tasks.forEach(scope::fork);
return scope.join(); // 返回第一个成功的结果
}
}
收集所有成功结果:
<T> List<T> runAll(Collection<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<T>allSuccessfulOrThrow())) {
tasks.forEach(scope::fork);
return scope.join()
.map(Subtask::get)
.toList();
}
}
# 4、超时控制
<T> List<T> runWithTimeout(Collection<Callable<T>> tasks, Duration timeout)
throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<T>allSuccessfulOrThrow(),
cf -> cf.withTimeout(timeout))) {
tasks.forEach(scope::fork);
return scope.join().map(Subtask::get).toList();
}
}
# 5、自定义Joiner
JDK内置的Joiner策略(如anySuccessfulResultOrThrow、allSuccessfulOrThrow)能满足大多数场景,但有时我们需要更灵活的控制。这时可以实现自定义的Joiner。
Joiner接口的核心方法:
public interface Joiner<T, R> {
// 每当一个子任务完成时调用(成功、失败或取消都会触发)
// 返回true表示取消作用域中所有剩余任务,返回false表示继续等待
boolean onComplete(Subtask<? extends T> subtask);
// 所有任务完成后,返回最终结果
R result() throws X;
}
理解执行流程:
fork(task1) ──┐
fork(task2) ──┼──> join() 等待 ──> 每个任务完成时调用 onComplete()
fork(task3) ──┘ │
↓
所有任务完成后调用 result()
│
↓
返回最终结果
示例:收集所有成功结果(忽略失败)
假设我们有多个服务调用,希望收集所有成功的结果,忽略失败的:
class CollectSuccessJoiner<T> implements Joiner<T, Stream<T>> {
// 使用线程安全的队列存储成功结果
private final Queue<T> results = new ConcurrentLinkedQueue<>();
@Override
public boolean onComplete(Subtask<? extends T> subtask) {
// 只收集成功的结果
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
}
// 返回false:不取消其他任务,继续等待所有任务完成
// 如果返回true:会立即取消所有未完成的任务
return false;
}
@Override
public Stream<T> result() {
// join()完成后返回收集到的所有成功结果
return results.stream();
}
}
使用自定义Joiner:
List<String> fetchFromMultipleServices() throws InterruptedException {
try (var scope = StructuredTaskScope.open(new CollectSuccessJoiner<String>())) {
// 分叉多个任务
scope.fork(() -> callServiceA()); // 可能成功
scope.fork(() -> callServiceB()); // 可能失败
scope.fork(() -> callServiceC()); // 可能成功
// join()会等待所有任务完成,然后调用result()
return scope.join().toList(); // 只返回成功的结果
}
}
示例:达到N个成功结果后提前返回
class FirstNSuccessJoiner<T> implements Joiner<T, List<T>> {
private final int targetCount;
private final Queue<T> results = new ConcurrentLinkedQueue<>();
private final AtomicInteger successCount = new AtomicInteger(0);
public FirstNSuccessJoiner(int n) {
this.targetCount = n;
}
@Override
public boolean onComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
// 达到目标数量,取消剩余任务
if (successCount.incrementAndGet() >= targetCount) {
return true; // 返回true取消所有未完成的任务
}
}
return false;
}
@Override
public List<T> result() {
return new ArrayList<>(results);
}
}
// 使用:从10个服务中获取最先成功的3个结果
List<String> results = StructuredTaskScope.open(new FirstNSuccessJoiner<>(3));
onComplete返回值的作用:
return false:继续等待其他任务完成return true:立即取消所有未完成的任务,join()随后返回
# 四、作用域值与结构化并发的协作
作用域值可以自动被结构化并发中的子任务继承:
private static final ScopedValue<FrameworkContext> CONTEXT = ScopedValue.newInstance();
void serve(Request request, Response response) {
var context = createContext(request);
ScopedValue.where(CONTEXT, context).run(() -> {
try (var scope = StructuredTaskScope.open()) {
// 子任务自动继承CONTEXT的绑定
Subtask<UserInfo> user = scope.fork(() -> readUserInfo());
Subtask<List<Offer>> offers = scope.fork(() -> fetchOffers());
scope.join();
// 处理结果...
}
});
}
UserInfo readUserInfo() {
// 可以直接访问父任务绑定的上下文
var context = CONTEXT.get();
return queryUser(context);
}
# 五、实战示例:Web框架上下文传递
public class WebFramework {
private static final ScopedValue<RequestContext> REQUEST_CTX = ScopedValue.newInstance();
public void handleRequest(HttpRequest request) {
var ctx = new RequestContext(request.getUserId(), request.getTraceId());
ScopedValue.where(REQUEST_CTX, ctx).run(() -> {
try (var scope = StructuredTaskScope.open()) {
var userTask = scope.fork(() -> userService.getUser());
var permTask = scope.fork(() -> permissionService.getPermissions());
scope.join();
var response = buildResponse(userTask.get(), permTask.get());
sendResponse(response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sendError("Request interrupted");
}
});
}
// 在服务层可以直接获取上下文
public static RequestContext currentContext() {
return REQUEST_CTX.get();
}
}
record RequestContext(String userId, String traceId) {}
# 六、迁移建议
# 从ThreadLocal迁移到ScopedValue
适合迁移的场景:
- 数据单向传递(调用者→被调用者)
- 数据在作用域内不需要修改
- 需要在虚拟线程中使用
不适合迁移的场景:
- 需要双向数据传递
- 需要在调用链中修改值
- 缓存昂贵对象(如
SimpleDateFormat)
# 从ExecutorService迁移到StructuredTaskScope
适合迁移的场景:
- 任务有明确的父子关系
- 需要统一的错误处理和取消机制
- 使用虚拟线程
保留ExecutorService的场景:
- 任务之间没有层次关系
- 需要任务队列和调度功能
- 需要与现有代码兼容
# 七、总结
| 特性 | 状态 | 主要用途 |
|---|---|---|
| 作用域值 | Java 25正式版 | 替代ThreadLocal,安全高效地传递上下文 |
| 结构化并发 | Java 25预览版 | 组织并发任务,统一生命周期管理 |
作用域值和结构化并发代表了Java并发编程的未来方向。它们与虚拟线程完美配合,提供了更安全、更高效、更易于理解的并发编程模型。
# 参考资料
- JEP 506: Scoped Values (opens new window)
- JEP 505: Structured Concurrency (Fifth Preview) (opens new window)
- JEP 444: Virtual Threads (opens new window)
- Structured Concurrency and Scoped Values in Java (opens new window)
祝你变得更强!