Project Reactor 中的 flatMap、flatMapSequential 和 concatMap 有什么区别?
Whats the difference between flatMap, flatMapSequential and concatMap in Project Reactor?
我从文档中了解到 flatMap
:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.
flatMapSequential
:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.
然后 concatMap
:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.
There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:
Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
Interleaving: this operator does not let values from different inners interleave (concatenation).
flatMap
和其他两个的区别很好理解,但是我不明白concatMap
和flatMapSequential
的区别是什么时候发生的。两者之间有什么性能差异吗?我读到 flatMapSequential
有一些队列的缓冲区大小,但我不明白为什么 concatMap
不需要。
flatMap
和 flatMapSequential
运算符急切地订阅,concatMap
在生成下一个 sub-stream 并订阅它之前等待每个内部完成。
让我们看一个例子:
@Test
void test_flatMap() {
Flux.just(1, 2, 3)
.flatMap(this::doSomethingAsync)
//.flatMapSequential(this::doSomethingAsync)
//.concatMap(this::doSomethingAsync)
.doOnNext(n -> log.info("Done {}", n))
.blockLast();
}
private Flux<Integer> doSomethingAsync(Integer number) {
//add some delay for the second item...
return number == 2 ? Mono.just(number).doOnNext(n -> log.info("Executing {}", n)).delayElement(Duration.ofSeconds(1))
: Mono.just(number).doOnNext(n -> log.info("Executing {}", n));
}
输出:
2022-04-22 19:38:49,164 INFO main - Executing 1
2022-04-22 19:38:49,168 INFO main - Done 1
2022-04-22 19:38:49,198 INFO main - Executing 2
2022-04-22 19:38:49,200 INFO main - Executing 3
2022-04-22 19:38:49,200 INFO main - Done 3
2022-04-22 19:38:50,200 INFO parallel-1 - Done 2
如您所见,flatMap
不保留原始顺序,并且热切地订阅了所有三个元素。另外,请注意元素 3 在元素 2 之前进行。
这是使用 flatMapSequential
的输出:
2022-04-22 19:53:40,229 INFO main - Executing 1
2022-04-22 19:53:40,232 INFO main - Done 1
2022-04-22 19:53:40,261 INFO main - Executing 2
2022-04-22 19:53:40,263 INFO main - Executing 3
2022-04-22 19:53:41,263 INFO parallel-1 - Done 2
2022-04-22 19:53:41,264 INFO parallel-1 - Done 3
flatMapSequential
已经像 flatMap
一样热切地订阅了所有三个元素,但通过对接收到的乱序元素进行排队来保持顺序。
这是使用 concatMap
的输出:
2022-04-22 19:59:31,817 INFO main - Executing 1
2022-04-22 19:59:31,820 INFO main - Done 1
2022-04-22 19:59:31,853 INFO main - Executing 2
2022-04-22 19:59:32,857 INFO parallel-1 - Done 2
2022-04-22 19:59:32,857 INFO parallel-1 - Executing 3
2022-04-22 19:59:32,857 INFO parallel-1 - Done 3
concatMap
自然保留与源元素相同的顺序。
我从文档中了解到 flatMap
:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.
flatMapSequential
:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.
然后 concatMap
:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation. There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:
Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
Interleaving: this operator does not let values from different inners interleave (concatenation).
flatMap
和其他两个的区别很好理解,但是我不明白concatMap
和flatMapSequential
的区别是什么时候发生的。两者之间有什么性能差异吗?我读到 flatMapSequential
有一些队列的缓冲区大小,但我不明白为什么 concatMap
不需要。
flatMap
和 flatMapSequential
运算符急切地订阅,concatMap
在生成下一个 sub-stream 并订阅它之前等待每个内部完成。
让我们看一个例子:
@Test
void test_flatMap() {
Flux.just(1, 2, 3)
.flatMap(this::doSomethingAsync)
//.flatMapSequential(this::doSomethingAsync)
//.concatMap(this::doSomethingAsync)
.doOnNext(n -> log.info("Done {}", n))
.blockLast();
}
private Flux<Integer> doSomethingAsync(Integer number) {
//add some delay for the second item...
return number == 2 ? Mono.just(number).doOnNext(n -> log.info("Executing {}", n)).delayElement(Duration.ofSeconds(1))
: Mono.just(number).doOnNext(n -> log.info("Executing {}", n));
}
输出:
2022-04-22 19:38:49,164 INFO main - Executing 1
2022-04-22 19:38:49,168 INFO main - Done 1
2022-04-22 19:38:49,198 INFO main - Executing 2
2022-04-22 19:38:49,200 INFO main - Executing 3
2022-04-22 19:38:49,200 INFO main - Done 3
2022-04-22 19:38:50,200 INFO parallel-1 - Done 2
如您所见,flatMap
不保留原始顺序,并且热切地订阅了所有三个元素。另外,请注意元素 3 在元素 2 之前进行。
这是使用 flatMapSequential
的输出:
2022-04-22 19:53:40,229 INFO main - Executing 1
2022-04-22 19:53:40,232 INFO main - Done 1
2022-04-22 19:53:40,261 INFO main - Executing 2
2022-04-22 19:53:40,263 INFO main - Executing 3
2022-04-22 19:53:41,263 INFO parallel-1 - Done 2
2022-04-22 19:53:41,264 INFO parallel-1 - Done 3
flatMapSequential
已经像 flatMap
一样热切地订阅了所有三个元素,但通过对接收到的乱序元素进行排队来保持顺序。
这是使用 concatMap
的输出:
2022-04-22 19:59:31,817 INFO main - Executing 1
2022-04-22 19:59:31,820 INFO main - Done 1
2022-04-22 19:59:31,853 INFO main - Executing 2
2022-04-22 19:59:32,857 INFO parallel-1 - Done 2
2022-04-22 19:59:32,857 INFO parallel-1 - Executing 3
2022-04-22 19:59:32,857 INFO parallel-1 - Done 3
concatMap
自然保留与源元素相同的顺序。