在并行流上传播 Sleuth baggage

Propagate Sleuth baggage on parallel streams

这个问题和一模一样,实际上没有回答(那个代码只用了一个线程)。我的代码目前看起来像这样

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
            foo.stream().parallel()
                    .forEach(bar -> {
                                //business logic
                            }
                    );
            return null;
        }, new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooBarStream"));

completableFuture.get();

但只有一个线程被正确跟踪。直接使用 .parallelStream()LazyTraceExecutor 而不是 TraceableExecutorService 没有帮助。

由于 this example,似乎可以正常工作。上面的代码片段变为:

TraceableExecutorService executorService = new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooStream");
CompletableFuture.allOf(runnablesBusinessLogic(foo,executorService)).get();

其中 runnablesBusinessLogic

private CompletableFuture<Void>[] runnablesBusinessLogic(List<FooBar> foo, ExecutorService executorService) {
    List<CompletableFuture<?>> futures = new ArrayList<>();
    for (FooBar f : foo) {
        futures.add(CompletableFuture.runAsync(() -> {
            businessLogic(f);
            return;
        }, executorService));
    }
    return futures.toArray(new CompletableFuture[futures.size()]);
}

如果我正确理解示例(以及文档当前状态背后的 discussion),Sleuth 无法自动使用 ForkJoinPool(以及并行流)。让它工作的主要想法不是创建一个 CompletableFuture 并将其拆分,而是创建多个 futures(并加入它们)。