为什么 Reactor 的 Mono.fromCompletionStage 比普通的 CompletableFuture 慢?

Why is Reactor's Mono.fromCompletionStage slower than a plain CompletableFuture?

我有一段简单的代码,在后台 "processes" 数据,并且在每个 nth 项目之后,记录在最后 n 项目上花费的总时间:

class BackgroundWorker implements AutoCloseable {
  private final ExecutorService thread = Executors.newSingleThreadExecutor();
  private final int reportEvery;
  private int processed;
  private LocalTime begin;

  BackgroundWorker(int reportEvery) {
    this.reportEvery = reportEvery;
  }

  CompletableFuture<Boolean> process(int item) {
    var future = new CompletableFuture<Boolean>();
    thread.submit(() ->  {
      try {
        if (processed == 0) {
          begin = LocalTime.now();
        }
        if (++processed == reportEvery) {
          System.out.format("Processed %d items in %dms%n",
              processed, ChronoUnit.MILLIS.between(begin, LocalTime.now()));
          processed = 0;
        }
        future.complete(true);
      } catch (Exception ex) {
        future.complete(false);
      }
    });
    return future;
  }

  @Override
  public void close() {
    thread.shutdownNow();
  }
}

然后我有一个 Flux 将数据馈送到 BackgroundWorker,计算成功完成的 CompletableFuture 秒:

Flux<Integer> numbers = Flux.fromStream(IntStream.range(0, 100000).boxed());
try (var worker = new BackgroundWorker(10000)) {
  int successCount = numbers
      .map(worker::process)
      .map(future -> future.thenApply(success -> success ? 1 : 0))
      .reduce(
          CompletableFuture.completedFuture(0),
          (acc, curr) -> acc.thenCombine(curr, Integer::sum))
      .block()
      .join();

  System.out.println("Done; success: " + successCount);
}

和同一段代码,但现在使用 Mono.fromCompletionStage 代替:

int successCount = numbers
    .map(worker::process)
    .map(Mono::fromCompletionStage)
    .map(mono -> mono.map(success -> success ? 1 : 0))
    .reduce(
        Mono.just(0),
        (acc, curr) -> acc.zipWith(curr, Integer::sum))
    .block()
    .block();

第一个使用 futures 的人会打印如下内容:

Processed 10000 items in 48ms
Processed 10000 items in 17ms
Processed 10000 items in 10ms
Processed 10000 items in 8ms
Processed 10000 items in 9ms
Processed 10000 items in 5ms
Processed 10000 items in 5ms
Processed 10000 items in 4ms
Processed 10000 items in 3ms
Processed 10000 items in 4ms
Done; success: 100000

但是使用 Mono.fromCompletionStage 的版本打印:

Processed 10000 items in 138ms
Processed 10000 items in 253ms
Processed 10000 items in 327ms
Processed 10000 items in 477ms
Processed 10000 items in 315ms
Processed 10000 items in 379ms
Processed 10000 items in 448ms
Processed 10000 items in 509ms
Processed 10000 items in 595ms
Processed 10000 items in 668ms
Done; success: 100000

为什么使用 Mono 而不是 CompletableFuture 会大大降低性能?

似乎 Monos 的压缩占用了最多的时间并以某种方式影响了执行。可能是因为像那样压缩每次都会创建一个新的 MonoZip 实例。

但此时您不必使用缩小和压缩。 flatMap monos 更惯用,获得一个 Flux<Integer>,你将在不创建中间垃圾的情况下减少它。

此外,由于 futures 基本上是在创建时开始处理,你可以做一个更简单的 concatMap(更少的开销,并且不必等待每个单声道的完成在这一点上并不重要,因为所有无论如何,期货已经 运行 在后台):

Flux<Integer> numbers = Flux.fromStream(IntStream.range(0, 100_000).boxed());
try (BackgroundWorker worker = new BackgroundWorker(10000)) {
    int successCount = numbers
            .map(worker::process)
            .concatMap(future -> Mono.fromCompletionStage(future))
            .map(success -> success ? 1 : 0)
            .reduce(0, Integer::sum)
            .block();

    System.out.println("Done; success: " + successCount);
}

您甚至可以通过避免从 boolean 到 int 的映射并在 reduce 中执行此操作来减少更多开销:

.reduce(0, (acc, bool) -> bool ? acc + 1 : acc)