flatMap 是如何管理线程的?
How does flatMap manage threads?
Flux.just("a", "b", "c")
.log(null, Level.INFO,true) // line 18
.flatMap(value -> Mono.just(value.toUpperCase())
.publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO,true) // line 21
.subscribe();
部分输出:
13:03:46 [main] INFO - | request(2) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(a) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(b) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(A) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(c) Flux.log(App.java:18)
13:03:46 [elastic-3] INFO - onNext(B) Flux.log(App.java:21)
13:03:46 [elastic-3] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(C) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onComplete() Flux.log(App.java:18)
13:03:46 [main] INFO - onComplete() Flux.log(App.java:21)
问题:
为什么 flatMap
从 main
线程请求 2 个元素,然后从其他线程请求更多元素?
为什么 subscribe
没有被 main
线程处理?
为什么从主线程请求 2?
第一个 Subscription.request
数量取决于您指定的并发级别,即 2
。由于您在主线程中调用 .subscribe
,第一个 prefetch
请求将在该线程上被调用。
让我们看看下一个架构:
.subscribe()[Thread main]
-> FluxLog.source.subscribe()[Tread Main]
-> FluxFlatMap.source.subscribe()[ThreadMain]
-> FluxJust.subscriber.onSubscribe()
-> FluxFlatMap.subscription.request(concurrency)[Thread Main]
接下来会发生什么?
那么,从那一点开始就是铁杆:)。由于您的内部流将被 FlatMapInner
which will observe all signals (onNext, onError, onComplete) on Scheduler.elastic
(because of your .publishOn
). In turn, when the inner stream has been completed, the FlatMapInnner
on its onComplete
will notify main FlatMapMain
which is a driver of whole flatMap
mechanism. Interaction between FlatMapInner
and FlatMapMain
is being over FlatMapMain.innerComplete
. Since, from the FlatMapMain perspective, internal FlatMapInner
is playing the role of Queue
, all elements will be drained
订阅。保持冷静,如果你不知道这里发生了什么,不要惊慌。该方法的全部想法是从内部流中排出数据并将其发送到下游,然后向上游请求新的数据部分。你应该记住的是 innerComplete
是从 FlatMapInner.onComplete
调用的,它被移动到另一个调度程序,所以这意味着下一个 Subscription.request
将从 [=35= 中指定的线程调用]
所以,接下来的流程大致如下:
FluxFlatMap.FlatMapMain.onNext [Thread Main]
-> Publisher m = mapper(...)
-> m.subscribe(new FluxFlatMap.FlatMapInner())
-> FluxFlatMap.FlatMapInner.onNext("a") [Thread Elastic N]
-> LambdaSubscriber.onNext("c") [Thread Elastic N]
-> FluxFlatMap.FlatMapInner.onComplete() [Thread Elastic N]
-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N]
-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] { ... subscription.request(amountOfCompletedInners)
-> FlatMap.FlatMapMain.onNext() [Thread Elastic N]
-> .... LambdaSubscriber.onNext("c") [Thread Elastic N]
-> ....
这样,你会看到main上的第一个request(2),然后是elastic的request(1)(因为一个inner已经完成,所以FlatMap会从upstream再请求1个元素来满足并发的需求)。
Flux.just("a", "b", "c")
.log(null, Level.INFO,true) // line 18
.flatMap(value -> Mono.just(value.toUpperCase())
.publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO,true) // line 21
.subscribe();
部分输出:
13:03:46 [main] INFO - | request(2) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(a) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(b) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(A) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(c) Flux.log(App.java:18)
13:03:46 [elastic-3] INFO - onNext(B) Flux.log(App.java:21)
13:03:46 [elastic-3] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(C) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onComplete() Flux.log(App.java:18)
13:03:46 [main] INFO - onComplete() Flux.log(App.java:21)
问题:
为什么
flatMap
从main
线程请求 2 个元素,然后从其他线程请求更多元素?为什么
subscribe
没有被main
线程处理?
为什么从主线程请求 2?
第一个 Subscription.request
数量取决于您指定的并发级别,即 2
。由于您在主线程中调用 .subscribe
,第一个 prefetch
请求将在该线程上被调用。
让我们看看下一个架构:
.subscribe()[Thread main]
-> FluxLog.source.subscribe()[Tread Main]
-> FluxFlatMap.source.subscribe()[ThreadMain]
-> FluxJust.subscriber.onSubscribe()
-> FluxFlatMap.subscription.request(concurrency)[Thread Main]
接下来会发生什么?
那么,从那一点开始就是铁杆:)。由于您的内部流将被 FlatMapInner
which will observe all signals (onNext, onError, onComplete) on Scheduler.elastic
(because of your .publishOn
). In turn, when the inner stream has been completed, the FlatMapInnner
on its onComplete
will notify main FlatMapMain
which is a driver of whole flatMap
mechanism. Interaction between FlatMapInner
and FlatMapMain
is being over FlatMapMain.innerComplete
. Since, from the FlatMapMain perspective, internal FlatMapInner
is playing the role of Queue
, all elements will be drained
订阅。保持冷静,如果你不知道这里发生了什么,不要惊慌。该方法的全部想法是从内部流中排出数据并将其发送到下游,然后向上游请求新的数据部分。你应该记住的是 innerComplete
是从 FlatMapInner.onComplete
调用的,它被移动到另一个调度程序,所以这意味着下一个 Subscription.request
将从 [=35= 中指定的线程调用]
所以,接下来的流程大致如下:
FluxFlatMap.FlatMapMain.onNext [Thread Main]
-> Publisher m = mapper(...)
-> m.subscribe(new FluxFlatMap.FlatMapInner())
-> FluxFlatMap.FlatMapInner.onNext("a") [Thread Elastic N]
-> LambdaSubscriber.onNext("c") [Thread Elastic N]
-> FluxFlatMap.FlatMapInner.onComplete() [Thread Elastic N]
-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N]
-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] { ... subscription.request(amountOfCompletedInners)
-> FlatMap.FlatMapMain.onNext() [Thread Elastic N]
-> .... LambdaSubscriber.onNext("c") [Thread Elastic N]
-> ....
这样,你会看到main上的第一个request(2),然后是elastic的request(1)(因为一个inner已经完成,所以FlatMap会从upstream再请求1个元素来满足并发的需求)。