如何将上下文传播到 Quarkus 中的并行流操作?
How to propagte context to parallelStream operations in Quarkus?
我有一系列简单的链式操作,使用 Panache 存储库检索和保存一些数据,运行 在 Quarkus 服务中。在这些操作并行化的地方,抛出 ContextNotActiveException
。在删除并行化的地方,代码按预期工作。
此代码有效:
dataRepository.get()
.map { convert(it) }
.forEach { perist(it) }
此代码不:
dataRepository.get()
.parallelStream()
.map { convert(it) }
.forEach { perist(it) }
Quarkus 文档非常有限,仅涉及 mutiny 或 RX 的使用。
如何传播上下文以使 parallelStream()
有效?
不幸的是,上下文传播在并行 Java 流中表现不佳,因为使流并行会自动将执行移动到 ForkJoinPool,这意味着您会丢失上下文。您需要以不同的方式处理并行性,而不是让 Java 流为您处理 - 您可能希望使用 org.eclipse.microprofile.context.ManagedExecutor
.
假设 convert
方法出于某种原因需要活动的请求上下文,您需要将其调用分派到托管执行程序中。这将确保传播上下文。
在 Java 代码中,我能想到的与您的代码非常接近的是:
@Inject
org.eclipse.microprofile.context.ManagedExecutor executor;
(...)
dataRepository.streamAll()
.forEach(i -> {
executor.supplyAsync(() -> {
return convert(i);
}).thenAccept(persist(i));
});
我有一系列简单的链式操作,使用 Panache 存储库检索和保存一些数据,运行 在 Quarkus 服务中。在这些操作并行化的地方,抛出 ContextNotActiveException
。在删除并行化的地方,代码按预期工作。
此代码有效:
dataRepository.get()
.map { convert(it) }
.forEach { perist(it) }
此代码不:
dataRepository.get()
.parallelStream()
.map { convert(it) }
.forEach { perist(it) }
Quarkus 文档非常有限,仅涉及 mutiny 或 RX 的使用。
如何传播上下文以使 parallelStream()
有效?
不幸的是,上下文传播在并行 Java 流中表现不佳,因为使流并行会自动将执行移动到 ForkJoinPool,这意味着您会丢失上下文。您需要以不同的方式处理并行性,而不是让 Java 流为您处理 - 您可能希望使用 org.eclipse.microprofile.context.ManagedExecutor
.
假设 convert
方法出于某种原因需要活动的请求上下文,您需要将其调用分派到托管执行程序中。这将确保传播上下文。
在 Java 代码中,我能想到的与您的代码非常接近的是:
@Inject
org.eclipse.microprofile.context.ManagedExecutor executor;
(...)
dataRepository.streamAll()
.forEach(i -> {
executor.supplyAsync(() -> {
return convert(i);
}).thenAccept(persist(i));
});