在并行流上传播 Sleuth baggage
Propagate Sleuth baggage on parallel streams
这个问题和一模一样,实际上没有回答(那个代码只用了一个线程)。我的代码目前看起来像这样
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
foo.stream().parallel()
.forEach(bar -> {
//business logic
}
);
return null;
}, new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooBarStream"));
completableFuture.get();
但只有一个线程被正确跟踪。直接使用 .parallelStream()
或 LazyTraceExecutor
而不是 TraceableExecutorService
没有帮助。
由于 this example,似乎可以正常工作。上面的代码片段变为:
TraceableExecutorService executorService = new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooStream");
CompletableFuture.allOf(runnablesBusinessLogic(foo,executorService)).get();
其中 runnablesBusinessLogic
是
private CompletableFuture<Void>[] runnablesBusinessLogic(List<FooBar> foo, ExecutorService executorService) {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (FooBar f : foo) {
futures.add(CompletableFuture.runAsync(() -> {
businessLogic(f);
return;
}, executorService));
}
return futures.toArray(new CompletableFuture[futures.size()]);
}
如果我正确理解示例(以及文档当前状态背后的 discussion),Sleuth 无法自动使用 ForkJoinPool
(以及并行流)。让它工作的主要想法不是创建一个 CompletableFuture
并将其拆分,而是创建多个 futures(并加入它们)。
这个问题和
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
foo.stream().parallel()
.forEach(bar -> {
//business logic
}
);
return null;
}, new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooBarStream"));
completableFuture.get();
但只有一个线程被正确跟踪。直接使用 .parallelStream()
或 LazyTraceExecutor
而不是 TraceableExecutorService
没有帮助。
由于 this example,似乎可以正常工作。上面的代码片段变为:
TraceableExecutorService executorService = new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooStream");
CompletableFuture.allOf(runnablesBusinessLogic(foo,executorService)).get();
其中 runnablesBusinessLogic
是
private CompletableFuture<Void>[] runnablesBusinessLogic(List<FooBar> foo, ExecutorService executorService) {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (FooBar f : foo) {
futures.add(CompletableFuture.runAsync(() -> {
businessLogic(f);
return;
}, executorService));
}
return futures.toArray(new CompletableFuture[futures.size()]);
}
如果我正确理解示例(以及文档当前状态背后的 discussion),Sleuth 无法自动使用 ForkJoinPool
(以及并行流)。让它工作的主要想法不是创建一个 CompletableFuture
并将其拆分,而是创建多个 futures(并加入它们)。