Java 的作用域值(Scoped Values)和结构化并发(Structured Concurrency)
以下代码在Java 24 preview测试通过不使用虚拟线程
package org.example;
import java.util.concurrent.*;
public class ScopedValueWithVirtualThreads {
void main() {
final String THREAD_S_N = "%s read USER_ID = %s, Thread: %s%n";
final ScopedValue<String> USER_ID = ScopedValue.newInstance();
// 将 ScopedValue 绑定到当前线程作用域
ScopedValue.where(USER_ID, "user-123").run(() -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure("Thread", Executors.defaultThreadFactory())) {
// 在子线程中读取 ScopedValue(会继承父线程的绑定)
var f1 = scope.fork(() -> String.format(THREAD_S_N, "Task1", USER_ID.get(), Thread.currentThread()));
var f2 = scope.fork(() -> String.format(THREAD_S_N, "Task2", USER_ID.get(), Thread.currentThread()));
scope.join();
scope.throwIfFailed();
System.out.printf(f1.get());
System.out.printf(f2.get());
System.out.printf(THREAD_S_N, "In-try", USER_ID.get(), Thread.currentThread());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}输出结果:
Task1 read USER_ID = user-123, Thread: Thread[#26,pool-1-thread-1,5,main]
Task2 read USER_ID = user-123, Thread: Thread[#27,pool-1-thread-2,5,main]
In-try read USER_ID = user-123, Thread: Thread[#3,main,5,main]
使用虚拟线程
package org.example;
import java.util.concurrent.*;
public class ScopedValueWithVirtualThreads {
void main() {
final String THREAD_S_N = "%s read USER_ID = %s, Thread: %s%n";
final ScopedValue<String> USER_ID = ScopedValue.newInstance();
// 将 ScopedValue 绑定到当前线程作用域
ScopedValue.where(USER_ID, "user-123").run(() -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 在子线程中读取 ScopedValue(会继承父线程的绑定)
var f1 = scope.fork(() -> String.format(THREAD_S_N, "Task1", USER_ID.get(), Thread.currentThread()));
var f2 = scope.fork(() -> String.format(THREAD_S_N, "Task2", USER_ID.get(), Thread.currentThread()));
scope.join();
scope.throwIfFailed();
System.out.printf(f1.get());
System.out.printf(f2.get());
System.out.printf(THREAD_S_N, "In-try", USER_ID.get(), Thread.currentThread());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}输出结果:
Task1 read USER_ID = user-123, Thread: VirtualThread[#27]/runnable@ForkJoinPool-1-worker-1
Task2 read USER_ID = user-123, Thread: VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1
In-try read USER_ID = user-123, Thread: Thread[#3,main,5,main]
解释
[*]try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
[*]创建一个 结构化任务作用域 scope,策略是 ShutdownOnFailure。
[*]含义:如果任何一个子任务抛出异常,自动取消其他所有子任务。
[*]自动资源管理:scope.close() 会在 try 块结束时被调用,做清理工作。
[*]scope.fork(...),使用 scope.fork() 启动一个新的子任务
[*]scope.join();,等待所有子任务完成(不论成功还是失败)。
[*]注意:这不会抛出异常,只是阻塞当前线程直到所有任务都完成。
[*]scope.throwIfFailed();,如果有任何子任务抛出异常,就把它的异常重新抛出,其他任务已被取消。
[*]ShutdownOnFailure 策略保证了只要有失败,就立即停止其他任务的运行(或阻止它们开始)。
什么是作用域值
[*]在父线程中使用 ScopedValue.where() 绑定了 USER_ID = "user-123";
[*]子线程是通过 StructuredTaskScope.fork() 启动的,它们自动继承了这个绑定;
[*]所以在 f1 和 f2 中访问 USER_ID.get() 会得到 "user-123",不需要重新传参;
[*]这种继承是线程安全的、只读的,不像 ThreadLocal 容易被污染或修改。
来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除
页:
[1]