蔺蓉城 发表于 2025-7-6 19:12:38

Java 的作用域值(Scoped Values)和结构化并发(Structured Concurrency)

以下代码在Java 24 preview测试通过
不使用虚拟线程

package org.example;

import java.util.concurrent.*;

public class ScopedValueWithVirtualThreads {
    void main() {
      final String THREAD_S_N = "%s read USER_ID = %s, Thread: %s%n";
      final ScopedValue<String> USER_ID = ScopedValue.newInstance();
      // 将 ScopedValue 绑定到当前线程作用域
      ScopedValue.where(USER_ID, "user-123").run(() -> {
            try (var scope = new StructuredTaskScope.ShutdownOnFailure("Thread", Executors.defaultThreadFactory())) {
                // 在子线程中读取 ScopedValue(会继承父线程的绑定)
                var f1 = scope.fork(() -> String.format(THREAD_S_N, "Task1", USER_ID.get(), Thread.currentThread()));
                var f2 = scope.fork(() -> String.format(THREAD_S_N, "Task2", USER_ID.get(), Thread.currentThread()));

                scope.join();
                scope.throwIfFailed();

                System.out.printf(f1.get());
                System.out.printf(f2.get());
                System.out.printf(THREAD_S_N, "In-try", USER_ID.get(), Thread.currentThread());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
      });
    }
}输出结果:
Task1 read USER_ID = user-123, Thread: Thread[#26,pool-1-thread-1,5,main]
Task2 read USER_ID = user-123, Thread: Thread[#27,pool-1-thread-2,5,main]
In-try read USER_ID = user-123, Thread: Thread[#3,main,5,main]
使用虚拟线程

package org.example;

import java.util.concurrent.*;

public class ScopedValueWithVirtualThreads {
    void main() {
      final String THREAD_S_N = "%s read USER_ID = %s, Thread: %s%n";
      final ScopedValue<String> USER_ID = ScopedValue.newInstance();
      // 将 ScopedValue 绑定到当前线程作用域
      ScopedValue.where(USER_ID, "user-123").run(() -> {
            try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
                // 在子线程中读取 ScopedValue(会继承父线程的绑定)
                var f1 = scope.fork(() -> String.format(THREAD_S_N, "Task1", USER_ID.get(), Thread.currentThread()));
                var f2 = scope.fork(() -> String.format(THREAD_S_N, "Task2", USER_ID.get(), Thread.currentThread()));

                scope.join();
                scope.throwIfFailed();

                System.out.printf(f1.get());
                System.out.printf(f2.get());
                System.out.printf(THREAD_S_N, "In-try", USER_ID.get(), Thread.currentThread());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
      });
    }
}输出结果:
Task1 read USER_ID = user-123, Thread: VirtualThread[#27]/runnable@ForkJoinPool-1-worker-1
Task2 read USER_ID = user-123, Thread: VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1
In-try read USER_ID = user-123, Thread: Thread[#3,main,5,main]
解释


[*]try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

[*]创建一个 结构化任务作用域 scope,策略是 ShutdownOnFailure。
[*]含义:如果任何一个子任务抛出异常,自动取消其他所有子任务。
[*]自动资源管理:scope.close() 会在 try 块结束时被调用,做清理工作。

[*]scope.fork(...),使用 scope.fork() 启动一个新的子任务
[*]scope.join();,等待所有子任务完成(不论成功还是失败)。

[*]注意:这不会抛出异常,只是阻塞当前线程直到所有任务都完成。

[*]scope.throwIfFailed();,如果有任何子任务抛出异常,就把它的异常重新抛出,其他任务已被取消。

[*]ShutdownOnFailure 策略保证了只要有失败,就立即停止其他任务的运行(或阻止它们开始)。

什么是作用域值


[*]在父线程中使用 ScopedValue.where() 绑定了 USER_ID = "user-123";
[*]子线程是通过 StructuredTaskScope.fork() 启动的,它们自动继承了这个绑定;
[*]所以在 f1 和 f2 中访问 USER_ID.get() 会得到 "user-123",不需要重新传参;
[*]这种继承是线程安全的、只读的,不像 ThreadLocal 容易被污染或修改。

来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除
页: [1]
查看完整版本: Java 的作用域值(Scoped Values)和结构化并发(Structured Concurrency)