轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • JavaScript
  • TypeScript
  • Node.js
  • Vue.js
  • 前端工程化
  • 浏览器与Web API
  • 架构设计与模式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • JavaScript
  • TypeScript
  • Node.js
  • Vue.js
  • 前端工程化
  • 浏览器与Web API
  • 架构设计与模式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Java

    • 核心

    • 并发

      • Java并发-线程基础与synchronized关键字
      • Java并发-重入锁ReentrantLock详解与实践
      • Java并发-信号量Semaphore
      • Java并发-读写锁ReadWriteLock
      • Java并发-倒计时器CountDownLatch
      • Java并发-栅栏CyclicBarrier
      • Java并发-LockSupport线程阻塞工具类
      • Java并发-线程池ThreadPoolExecutor
      • Java并发-阻塞队列BlockingQueue
      • Java并发-以空间换时间之ThreadLocal
      • Java并发-无锁策略CAS与atomic包
      • Java并发-JDK并发容器
      • Java并发-异步调用结果之Future和CompletableFuture
      • Java并发-Fork Join框架
      • Java并发-作用域值与结构化并发
        • 一、为什么需要新的并发特性?
          • 1、ThreadLocal的问题
          • 2、非结构化并发的问题
        • 二、作用域值(Scoped Values)
          • 1、基本用法
          • 2、与ThreadLocal的对比
          • 3、重新绑定
          • 4、多值绑定
          • 5、返回值和异常处理
          • 6、检查绑定状态
        • 三、结构化并发(Structured Concurrency)
          • 1、基本用法
          • 2、核心优势
          • 3、Joiner策略
          • 4、超时控制
          • 5、自定义Joiner
        • 四、作用域值与结构化并发的协作
        • 五、实战示例:Web框架上下文传递
        • 六、迁移建议
          • 从ThreadLocal迁移到ScopedValue
          • 从ExecutorService迁移到StructuredTaskScope
        • 七、总结
        • 参考资料
      • Java并发-调试与诊断
    • 经验

    • JVM

    • 企业应用

  • Spring

  • 其他语言

  • 工具

  • 后端
  • Java
  • 并发
轩辕李
2025-03-21
目录

Java并发-作用域值与结构化并发

Java 21引入了虚拟线程(Virtual Threads),彻底改变了Java的并发编程模型。为了更好地配合虚拟线程,Java还引入了两个重要的并发特性:作用域值(Scoped Values) 和 结构化并发(Structured Concurrency)。

作用域值在Java 25中正式发布(JEP 506),而结构化并发目前仍处于预览阶段(JEP 505)。这两个特性相辅相成,共同构建了现代Java并发编程的新范式。

# 一、为什么需要新的并发特性?

# 1、ThreadLocal的问题

在传统的并发编程中,我们经常使用ThreadLocal来在线程间传递上下文数据。但ThreadLocal存在几个固有的设计缺陷:

无约束的可变性:任何能调用get()方法的代码都能调用set()方法,导致数据流难以追踪。

无界的生命周期:一旦设置了值,它会一直保留到线程结束或显式调用remove()。开发者经常忘记调用remove(),导致内存泄漏。

昂贵的继承开销:当使用InheritableThreadLocal时,子线程需要复制父线程的所有线程局部变量,在大量虚拟线程场景下会造成显著的内存开销。

# 2、非结构化并发的问题

使用ExecutorService进行并发编程时,任务之间的关系是松散的:

Response handle() throws ExecutionException, InterruptedException {
    Future<String> user = executor.submit(() -> findUser());
    Future<Integer> order = executor.submit(() -> fetchOrder());
    String theUser = user.get();   // 阻塞等待
    int theOrder = order.get();    // 阻塞等待
    return new Response(theUser, theOrder);
}

这种方式存在以下问题:

  • 如果findUser()抛出异常,fetchOrder()仍会继续运行,造成线程泄漏
  • 如果主线程被中断,子任务不会自动取消
  • 如果fetchOrder()失败,主线程仍会等待findUser()完成

# 二、作用域值(Scoped Values)

作用域值是ThreadLocal的现代替代方案,专为虚拟线程和结构化并发设计。

# 1、基本用法

// 声明作用域值(通常是static final)
private static final ScopedValue<String> USER = ScopedValue.newInstance();

// 绑定并运行
public void processRequest(String username) {
    ScopedValue.where(USER, username).run(() -> {
        // 在这个作用域内,USER绑定到username
        handleRequest();
    });
}

// 在调用链的任何位置读取
private void handleRequest() {
    String currentUser = USER.get();  // 获取绑定的值
    System.out.println("处理用户请求: " + currentUser);
    doSomething();
}

private void doSomething() {
    // 深层调用也能访问
    String user = USER.get();
    System.out.println("当前用户: " + user);
}

# 2、与ThreadLocal的对比

特性 ThreadLocal ScopedValue
可变性 可随时修改 不可变(单次绑定)
生命周期 无界(直到线程结束) 有界(作用域结束自动清理)
继承方式 复制(开销大) 共享(零开销)
数据流向 双向 单向(调用者→被调用者)
性能 较慢 接近本地变量访问速度

# 3、重新绑定

在调用链中,可以为后续调用重新绑定作用域值:

private static final ScopedValue<String> CONTEXT = ScopedValue.newInstance();

void foo() {
    ScopedValue.where(CONTEXT, "hello").run(() -> bar());
}

void bar() {
    System.out.println(CONTEXT.get()); // 输出: hello
    
    // 为后续调用重新绑定
    ScopedValue.where(CONTEXT, "goodbye").run(() -> baz());
    
    System.out.println(CONTEXT.get()); // 输出: hello(恢复原值)
}

void baz() {
    System.out.println(CONTEXT.get()); // 输出: goodbye
}

# 4、多值绑定

可以同时绑定多个作用域值:

private static final ScopedValue<String> USER = ScopedValue.newInstance();
private static final ScopedValue<String> TENANT = ScopedValue.newInstance();

void process() {
    ScopedValue.where(USER, "alice")
               .where(TENANT, "company-a")
               .run(() -> {
                   // 两个值都可用
                   System.out.println("用户: " + USER.get());
                   System.out.println("租户: " + TENANT.get());
               });
}

# 5、返回值和异常处理

使用call()方法可以返回值或抛出异常:

private static final ScopedValue<Connection> DB_CONN = ScopedValue.newInstance();

String queryUser(Connection conn, String userId) throws Exception {
    return ScopedValue.where(DB_CONN, conn).call(() -> {
        return executeQuery(userId);
    });
}

private String executeQuery(String userId) throws SQLException {
    Connection conn = DB_CONN.get();
    // 执行查询...
    return "user-data";
}

# 6、检查绑定状态

if (USER.isBound()) {
    String user = USER.get();
    // 处理...
} else {
    // 未绑定时的处理
}

// 或使用orElse提供默认值
String user = USER.orElse("anonymous");

# 三、结构化并发(Structured Concurrency)

结构化并发将并发任务组织成清晰的层次结构,使得任务的生命周期与代码的词法作用域保持一致。

# 1、基本用法

Response handle() throws InterruptedException {
    try (var scope = StructuredTaskScope.open()) {
        // 分叉子任务
        Subtask<String> user = scope.fork(() -> findUser());
        Subtask<Integer> order = scope.fork(() -> fetchOrder());
        
        // 等待所有子任务完成
        scope.join();
        
        // 处理结果
        return new Response(user.get(), order.get());
    }
}

# 2、核心优势

错误处理与短路:如果任一子任务失败,其他子任务会被自动取消。

取消传播:如果主线程被中断,所有子任务会自动取消。

清晰的结构:任务的生命周期由代码块明确界定。

可观测性:线程转储可以清晰显示任务层次结构。

# 3、Joiner策略

StructuredTaskScope支持不同的完成策略:

默认策略(任一失败则失败):

try (var scope = StructuredTaskScope.open()) {
    scope.fork(() -> task1());
    scope.fork(() -> task2());
    scope.join();  // 任一失败则抛出异常
}

任一成功即返回:

<T> T race(Collection<Callable<T>> tasks) throws InterruptedException {
    try (var scope = StructuredTaskScope.open(
            Joiner.<T>anySuccessfulResultOrThrow())) {
        tasks.forEach(scope::fork);
        return scope.join();  // 返回第一个成功的结果
    }
}

收集所有成功结果:

<T> List<T> runAll(Collection<Callable<T>> tasks) throws InterruptedException {
    try (var scope = StructuredTaskScope.open(
            Joiner.<T>allSuccessfulOrThrow())) {
        tasks.forEach(scope::fork);
        return scope.join()
                    .map(Subtask::get)
                    .toList();
    }
}

# 4、超时控制

<T> List<T> runWithTimeout(Collection<Callable<T>> tasks, Duration timeout) 
        throws InterruptedException {
    try (var scope = StructuredTaskScope.open(
            Joiner.<T>allSuccessfulOrThrow(),
            cf -> cf.withTimeout(timeout))) {
        tasks.forEach(scope::fork);
        return scope.join().map(Subtask::get).toList();
    }
}

# 5、自定义Joiner

JDK内置的Joiner策略(如anySuccessfulResultOrThrow、allSuccessfulOrThrow)能满足大多数场景,但有时我们需要更灵活的控制。这时可以实现自定义的Joiner。

Joiner接口的核心方法:

public interface Joiner<T, R> {
    // 每当一个子任务完成时调用(成功、失败或取消都会触发)
    // 返回true表示取消作用域中所有剩余任务,返回false表示继续等待
    boolean onComplete(Subtask<? extends T> subtask);

    // 所有任务完成后,返回最终结果
    R result() throws X;
}

理解执行流程:

fork(task1) ──┐
fork(task2) ──┼──> join() 等待 ──> 每个任务完成时调用 onComplete()
fork(task3) ──┘                          │
                                         ↓
                              所有任务完成后调用 result()
                                         │
                                         ↓
                                   返回最终结果

示例:收集所有成功结果(忽略失败)

假设我们有多个服务调用,希望收集所有成功的结果,忽略失败的:

class CollectSuccessJoiner<T> implements Joiner<T, Stream<T>> {
    // 使用线程安全的队列存储成功结果
    private final Queue<T> results = new ConcurrentLinkedQueue<>();

    @Override
    public boolean onComplete(Subtask<? extends T> subtask) {
        // 只收集成功的结果
        if (subtask.state() == Subtask.State.SUCCESS) {
            results.add(subtask.get());
        }
        // 返回false:不取消其他任务,继续等待所有任务完成
        // 如果返回true:会立即取消所有未完成的任务
        return false;
    }

    @Override
    public Stream<T> result() {
        // join()完成后返回收集到的所有成功结果
        return results.stream();
    }
}

使用自定义Joiner:

List<String> fetchFromMultipleServices() throws InterruptedException {
    try (var scope = StructuredTaskScope.open(new CollectSuccessJoiner<String>())) {
        // 分叉多个任务
        scope.fork(() -> callServiceA());  // 可能成功
        scope.fork(() -> callServiceB());  // 可能失败
        scope.fork(() -> callServiceC());  // 可能成功

        // join()会等待所有任务完成,然后调用result()
        return scope.join().toList();  // 只返回成功的结果
    }
}

示例:达到N个成功结果后提前返回

class FirstNSuccessJoiner<T> implements Joiner<T, List<T>> {
    private final int targetCount;
    private final Queue<T> results = new ConcurrentLinkedQueue<>();
    private final AtomicInteger successCount = new AtomicInteger(0);

    public FirstNSuccessJoiner(int n) {
        this.targetCount = n;
    }

    @Override
    public boolean onComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            results.add(subtask.get());
            // 达到目标数量,取消剩余任务
            if (successCount.incrementAndGet() >= targetCount) {
                return true;  // 返回true取消所有未完成的任务
            }
        }
        return false;
    }

    @Override
    public List<T> result() {
        return new ArrayList<>(results);
    }
}

// 使用:从10个服务中获取最先成功的3个结果
List<String> results = StructuredTaskScope.open(new FirstNSuccessJoiner<>(3));

onComplete返回值的作用:

  • return false:继续等待其他任务完成
  • return true:立即取消所有未完成的任务,join()随后返回

# 四、作用域值与结构化并发的协作

作用域值可以自动被结构化并发中的子任务继承:

private static final ScopedValue<FrameworkContext> CONTEXT = ScopedValue.newInstance();

void serve(Request request, Response response) {
    var context = createContext(request);
    ScopedValue.where(CONTEXT, context).run(() -> {
        try (var scope = StructuredTaskScope.open()) {
            // 子任务自动继承CONTEXT的绑定
            Subtask<UserInfo> user = scope.fork(() -> readUserInfo());
            Subtask<List<Offer>> offers = scope.fork(() -> fetchOffers());
            scope.join();
            // 处理结果...
        }
    });
}

UserInfo readUserInfo() {
    // 可以直接访问父任务绑定的上下文
    var context = CONTEXT.get();
    return queryUser(context);
}

# 五、实战示例:Web框架上下文传递

public class WebFramework {
    private static final ScopedValue<RequestContext> REQUEST_CTX = ScopedValue.newInstance();
    
    public void handleRequest(HttpRequest request) {
        var ctx = new RequestContext(request.getUserId(), request.getTraceId());
        
        ScopedValue.where(REQUEST_CTX, ctx).run(() -> {
            try (var scope = StructuredTaskScope.open()) {
                var userTask = scope.fork(() -> userService.getUser());
                var permTask = scope.fork(() -> permissionService.getPermissions());
                
                scope.join();
                
                var response = buildResponse(userTask.get(), permTask.get());
                sendResponse(response);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                sendError("Request interrupted");
            }
        });
    }
    
    // 在服务层可以直接获取上下文
    public static RequestContext currentContext() {
        return REQUEST_CTX.get();
    }
}

record RequestContext(String userId, String traceId) {}

# 六、迁移建议

# 从ThreadLocal迁移到ScopedValue

适合迁移的场景:

  • 数据单向传递(调用者→被调用者)
  • 数据在作用域内不需要修改
  • 需要在虚拟线程中使用

不适合迁移的场景:

  • 需要双向数据传递
  • 需要在调用链中修改值
  • 缓存昂贵对象(如SimpleDateFormat)

# 从ExecutorService迁移到StructuredTaskScope

适合迁移的场景:

  • 任务有明确的父子关系
  • 需要统一的错误处理和取消机制
  • 使用虚拟线程

保留ExecutorService的场景:

  • 任务之间没有层次关系
  • 需要任务队列和调度功能
  • 需要与现有代码兼容

# 七、总结

特性 状态 主要用途
作用域值 Java 25正式版 替代ThreadLocal,安全高效地传递上下文
结构化并发 Java 25预览版 组织并发任务,统一生命周期管理

作用域值和结构化并发代表了Java并发编程的未来方向。它们与虚拟线程完美配合,提供了更安全、更高效、更易于理解的并发编程模型。

# 参考资料

  • JEP 506: Scoped Values (opens new window)
  • JEP 505: Structured Concurrency (Fifth Preview) (opens new window)
  • JEP 444: Virtual Threads (opens new window)
  • Structured Concurrency and Scoped Values in Java (opens new window)

祝你变得更强!

编辑 (opens new window)
#ScopedValue#StructuredConcurrency#虚拟线程
上次更新: 2025/12/06
Java并发-Fork Join框架
Java并发-调试与诊断

← Java并发-Fork Join框架 Java并发-调试与诊断→

最近更新
01
AI编程时代的一些心得
09-11
02
Claude Code与Codex的协同工作
09-01
03
Claude Code 最佳实践(个人版)
08-01
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式