Project Reactor 中的 flatMap、flatMapSequential 和 concatMap 有什么区别?

Whats the difference between flatMap, flatMapSequential and concatMap in Project Reactor?

我从文档中了解到 flatMap:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

flatMapSequential:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

然后 concatMap:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation. There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:

Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.

Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.

Interleaving: this operator does not let values from different inners interleave (concatenation).

flatMap和其他两个的区别很好理解,但是我不明白concatMapflatMapSequential的区别是什么时候发生的。两者之间有什么性能差异吗?我读到 flatMapSequential 有一些队列的缓冲区大小,但我不明白为什么 concatMap 不需要。

flatMapflatMapSequential 运算符急切地订阅,concatMap 在生成下一个 sub-stream 并订阅它之前等待每个内部完成。

让我们看一个例子:

  @Test
  void test_flatMap() {
    Flux.just(1, 2, 3)
        .flatMap(this::doSomethingAsync)
        //.flatMapSequential(this::doSomethingAsync)
        //.concatMap(this::doSomethingAsync)
        .doOnNext(n -> log.info("Done {}", n))
        .blockLast();
  }

  private Flux<Integer> doSomethingAsync(Integer number) {
    //add some delay for the second item...
    return number == 2 ? Mono.just(number).doOnNext(n -> log.info("Executing {}", n)).delayElement(Duration.ofSeconds(1))
        : Mono.just(number).doOnNext(n -> log.info("Executing {}", n));
  }

输出:

2022-04-22 19:38:49,164  INFO main - Executing 1
2022-04-22 19:38:49,168  INFO main - Done 1
2022-04-22 19:38:49,198  INFO main - Executing 2
2022-04-22 19:38:49,200  INFO main - Executing 3
2022-04-22 19:38:49,200  INFO main - Done 3
2022-04-22 19:38:50,200  INFO parallel-1 - Done 2

如您所见,flatMap 不保留原始顺序,并且热切地订阅了所有三个元素。另外,请注意元素 3 在元素 2 之前进行。

这是使用 flatMapSequential 的输出:

2022-04-22 19:53:40,229  INFO main - Executing 1
2022-04-22 19:53:40,232  INFO main - Done 1
2022-04-22 19:53:40,261  INFO main - Executing 2
2022-04-22 19:53:40,263  INFO main - Executing 3
2022-04-22 19:53:41,263  INFO parallel-1 - Done 2
2022-04-22 19:53:41,264  INFO parallel-1 - Done 3

flatMapSequential 已经像 flatMap 一样热切地订阅了所有三个元素,但通过对接收到的乱序元素进行排队来保持顺序。

这是使用 concatMap 的输出:

2022-04-22 19:59:31,817  INFO main - Executing 1
2022-04-22 19:59:31,820  INFO main - Done 1
2022-04-22 19:59:31,853  INFO main - Executing 2
2022-04-22 19:59:32,857  INFO parallel-1 - Done 2
2022-04-22 19:59:32,857  INFO parallel-1 - Executing 3
2022-04-22 19:59:32,857  INFO parallel-1 - Done 3

concatMap 自然保留与源元素相同的顺序。