FlatMap 何时会同时监听多个源?
When FlatMap will listen to multiple sources concurrently?
什么情况导致Flux::flatMap
同时收听多个来源(0...无限)?
我在实验时发现,当上游在线程 thread-upstream-1
中向 flatMap
发送信号时,flatMap 将监听 N
个内部流,并且每个内部流在 不同的 线程中发送信号:thread-inner-stream-i
对于 1<=i<=N
,而不是对于每个 1<=i<=N
如果 thread-upstream-1 != thread-inner-stream-i
,flatMap
将监听并发到所有内部流。
我认为这不完全正确,我错过了其他一些场景。
flatMap
不做任何并行工作,如:它不改变线程。最简单的例子是
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
这会打印:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO reactor.Flux.FlatMap.1 - onNext(10)
[main] INFO reactor.Flux.FlatMap.1 - onNext(11)
[main] INFO reactor.Flux.FlatMap.1 - onNext(20)
[main] INFO reactor.Flux.FlatMap.1 - onNext(21)
[main] INFO reactor.Flux.FlatMap.1 - onNext(30)
[main] INFO reactor.Flux.FlatMap.1 - onNext(31)
[main] INFO reactor.Flux.FlatMap.1 - onNext(40)
[main] INFO reactor.Flux.FlatMap.1 - onNext(41)
[main] INFO reactor.Flux.FlatMap.1 - onNext(50)
[main] INFO reactor.Flux.FlatMap.1 - onNext(51)
[main] INFO reactor.Flux.FlatMap.1 - onComplete()
如您所见,仅在 main
中生产。如果在初始范围后添加 publishOn
,flatMap
会在 publishOn 将切换到的同一单线程中生成所有内容。
然而 flatMap
所做的是 订阅 到多个内部 Publisher
,直到 concurrency
参数,默认值为 [=21] =] (256).
这意味着如果您将其设置为 3
,flatMap
会将 3 个源元素映射到它们的内部 Publisher
并订阅这些发布者,但至少会等待一个在开始映射更多源元素之前完成。
如果内Publisher
使用publishOn
或subscribeOn
,那么flatMap
自然会让他们的事件发生在当时定义的线程中:
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(v * 10, 2)
.publishOn(Schedulers.newParallel("foo", 3)))
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
打印:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(10)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(11)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(20)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(21)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(30)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(31)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(50)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(51)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(40)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(41)
[foo-4] INFO reactor.Flux.FlatMap.1 - onComplete()
什么情况导致Flux::flatMap
同时收听多个来源(0...无限)?
我在实验时发现,当上游在线程 thread-upstream-1
中向 flatMap
发送信号时,flatMap 将监听 N
个内部流,并且每个内部流在 不同的 线程中发送信号:thread-inner-stream-i
对于 1<=i<=N
,而不是对于每个 1<=i<=N
如果 thread-upstream-1 != thread-inner-stream-i
,flatMap
将监听并发到所有内部流。
我认为这不完全正确,我错过了其他一些场景。
flatMap
不做任何并行工作,如:它不改变线程。最简单的例子是
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
这会打印:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO reactor.Flux.FlatMap.1 - onNext(10)
[main] INFO reactor.Flux.FlatMap.1 - onNext(11)
[main] INFO reactor.Flux.FlatMap.1 - onNext(20)
[main] INFO reactor.Flux.FlatMap.1 - onNext(21)
[main] INFO reactor.Flux.FlatMap.1 - onNext(30)
[main] INFO reactor.Flux.FlatMap.1 - onNext(31)
[main] INFO reactor.Flux.FlatMap.1 - onNext(40)
[main] INFO reactor.Flux.FlatMap.1 - onNext(41)
[main] INFO reactor.Flux.FlatMap.1 - onNext(50)
[main] INFO reactor.Flux.FlatMap.1 - onNext(51)
[main] INFO reactor.Flux.FlatMap.1 - onComplete()
如您所见,仅在 main
中生产。如果在初始范围后添加 publishOn
,flatMap
会在 publishOn 将切换到的同一单线程中生成所有内容。
然而 flatMap
所做的是 订阅 到多个内部 Publisher
,直到 concurrency
参数,默认值为 [=21] =] (256).
这意味着如果您将其设置为 3
,flatMap
会将 3 个源元素映射到它们的内部 Publisher
并订阅这些发布者,但至少会等待一个在开始映射更多源元素之前完成。
如果内Publisher
使用publishOn
或subscribeOn
,那么flatMap
自然会让他们的事件发生在当时定义的线程中:
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(v * 10, 2)
.publishOn(Schedulers.newParallel("foo", 3)))
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
打印:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(10)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(11)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(20)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(21)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(30)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(31)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(50)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(51)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(40)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(41)
[foo-4] INFO reactor.Flux.FlatMap.1 - onComplete()