Reactor信号的基本问题
Basic questions about Reactor signals
我对以下代码的输出有一些疑问:
Flux.just("a", "b", "c", "d")
.log(null, Level.INFO, true) // line: 18
.flatMap(value ->
Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO, true) // line: 21
.take(3)
.log(null, Level.INFO, true) // line: 23
.subscribe(x ->
System.out.println("Thread: " + Thread.currentThread().getName() +
" , " + x));
Thread.sleep(1000 * 1000);
输出:
1. 11:29:11 [main] INFO - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) Flux.log(App.java:18)
2. 11:29:11 [main] INFO - onSubscribe(FluxFlatMap.FlatMapMain) Flux.log(App.java:21)
3. 11:29:11 [main] INFO - onSubscribe(FluxTake.TakeSubscriber) Flux.log(App.java:23)
4. 11:29:11 [main] INFO - request(unbounded) Flux.log(App.java:23)
5. 11:29:11 [main] INFO - request(unbounded) Flux.log(App.java:21)
6. 11:29:11 [main] INFO - | request(2) Flux.log(App.java:18)
7. 11:29:11 [main] INFO - | onNext(a) Flux.log(App.java:18)
8. 11:29:11 [main] INFO - | onNext(b) Flux.log(App.java:18)
9. 11:29:11 [elastic-2] INFO - onNext(A) Flux.log(App.java:21)
10. 11:29:11 [elastic-2] INFO - onNext(A) Flux.log(App.java:23)
11. Thread: elastic-2 , A
12. 11:29:11 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13. 11:29:11 [main] INFO - | onNext(c) Flux.log(App.java:18)
14. 11:29:11 [elastic-3] INFO - onNext(B) Flux.log(App.java:21)
15. 11:29:11 [elastic-3] INFO - onNext(B) Flux.log(App.java:23)
16. Thread: elastic-3 , B
17. 11:29:11 [elastic-3] INFO - | request(1) Flux.log(App.java:18)
18. 11:29:11 [elastic-3] INFO - | onNext(d) Flux.log(App.java:18)
19. 11:29:11 [elastic-3] INFO - | onComplete() Flux.log(App.java:18)
20. 11:29:11 [elastic-3] INFO - onNext(C) Flux.log(App.java:21)
21. 11:29:11 [elastic-3] INFO - onNext(C) Flux.log(App.java:23)
22. Thread: elastic-3 , C
23. 11:29:11 [elastic-3] INFO - cancel() Flux.log(App.java:21)
24. 11:29:11 [elastic-3] INFO - onComplete() Flux.log(App.java:23)
25. 11:29:11 [elastic-3] INFO - | cancel() Flux.log(App.java:18)
问题:每个问题都是关于输出中的特定行(而不是代码中的一行)。我还添加了我对其中一些答案的回答,但我不确定我是否正确。
订阅时,订阅操作要求unbounded
个元素。那么为什么事件:request(unbounded)
在管道中下降而不是上升?我的回答:unbounded
数量的请求上升到 take
然后 take
再次发送它。
flatMap
发送cancel
信号。为什么 take
不发送呢?
最后一个问题:输出的终端信号不止一个。这不是对反应流规范的评价吗?
那样的话,只会产生一个终端信号。
Flux.just("a", "b", "c", "d")
.log(null, Level.INFO, true) // line: 18
.flatMap(value ->
Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO, true) // line: 21
.take(3)
.log(null, Level.INFO, true) // line: 23
.subscribe(x ->
System.out.println("Thread: " + Thread.currentThread().getName() +
" , " + x), t -> {}, () -> System.out.println("Completed ""Only Once"));
这里棘手的部分是每个 Reactor 3 操作员都有自己的生命,他们都按照相同的规则进行游戏 - 发出 onComplete
以通知下游操作员不再有数据。
由于您有 .log()
运算符和三个不同的点,因此您将观察到来自 .just
、.flatMap
和 [=16= 的三个独立 onComplete
信号].
首先,您会从 .just
中看到 onComplete
,因为 .flatMap
的默认行为是“好的,我们先尝试请求 concurrency
个元素,然后让我们看看它是怎么回事',因为 .just
可能只产生(在你的情况下)4 个元素,在 2(在你的例子中是并发级别)请求的需求它将发出 2 onNext
并且在两个 request(1)
你会看到 onComplete
。反过来,emitted onComplete
让 .flatMap
知道当 4 个扁平流发出它们的 .onComplete
信号时,它将被允许向下游发出自己的 onComplete
。
反过来,下游是 .take(3)
运算符,它也在前三个元素之后发出自己的 onComplete
信号,而无需等待上游 onComplete
。由于在 .take
之后有 .log
运算符,此信号也将被记录。
最后,在你的流程中,你有 3 个独立的日志操作员,它们将记录来自 3 个独立操作员的 3 个独立 onComplete
,但尽管如此,最终终端 .subscribe
将只收到一个 onComplete
从第一个操作员到流程。
关于 .take
行为的小更新
.take
的中心思想是获取元素直到满足剩余计数。由于上游可能产生比请求更多的数据,我们需要有一种机制来防止发送更多数据。 Reactive-Streams 规范为我们提供的机制之一是 Subscription
上的协作。订阅有两种主要方法 - request
- 显示需求和 cancel
- 显示不再需要数据,即使请求的需求未得到满足。
在 .take
运算符的情况下,initial demand is Long.MAX_VALUE
, which considers as unbounded demand. Therefore, the only way to stop consuming potentially infinitive stream of data is to cancel 订阅,或者换句话说取消订阅
希望对你有帮助:)
我对以下代码的输出有一些疑问:
Flux.just("a", "b", "c", "d")
.log(null, Level.INFO, true) // line: 18
.flatMap(value ->
Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO, true) // line: 21
.take(3)
.log(null, Level.INFO, true) // line: 23
.subscribe(x ->
System.out.println("Thread: " + Thread.currentThread().getName() +
" , " + x));
Thread.sleep(1000 * 1000);
输出:
1. 11:29:11 [main] INFO - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) Flux.log(App.java:18)
2. 11:29:11 [main] INFO - onSubscribe(FluxFlatMap.FlatMapMain) Flux.log(App.java:21)
3. 11:29:11 [main] INFO - onSubscribe(FluxTake.TakeSubscriber) Flux.log(App.java:23)
4. 11:29:11 [main] INFO - request(unbounded) Flux.log(App.java:23)
5. 11:29:11 [main] INFO - request(unbounded) Flux.log(App.java:21)
6. 11:29:11 [main] INFO - | request(2) Flux.log(App.java:18)
7. 11:29:11 [main] INFO - | onNext(a) Flux.log(App.java:18)
8. 11:29:11 [main] INFO - | onNext(b) Flux.log(App.java:18)
9. 11:29:11 [elastic-2] INFO - onNext(A) Flux.log(App.java:21)
10. 11:29:11 [elastic-2] INFO - onNext(A) Flux.log(App.java:23)
11. Thread: elastic-2 , A
12. 11:29:11 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13. 11:29:11 [main] INFO - | onNext(c) Flux.log(App.java:18)
14. 11:29:11 [elastic-3] INFO - onNext(B) Flux.log(App.java:21)
15. 11:29:11 [elastic-3] INFO - onNext(B) Flux.log(App.java:23)
16. Thread: elastic-3 , B
17. 11:29:11 [elastic-3] INFO - | request(1) Flux.log(App.java:18)
18. 11:29:11 [elastic-3] INFO - | onNext(d) Flux.log(App.java:18)
19. 11:29:11 [elastic-3] INFO - | onComplete() Flux.log(App.java:18)
20. 11:29:11 [elastic-3] INFO - onNext(C) Flux.log(App.java:21)
21. 11:29:11 [elastic-3] INFO - onNext(C) Flux.log(App.java:23)
22. Thread: elastic-3 , C
23. 11:29:11 [elastic-3] INFO - cancel() Flux.log(App.java:21)
24. 11:29:11 [elastic-3] INFO - onComplete() Flux.log(App.java:23)
25. 11:29:11 [elastic-3] INFO - | cancel() Flux.log(App.java:18)
问题:每个问题都是关于输出中的特定行(而不是代码中的一行)。我还添加了我对其中一些答案的回答,但我不确定我是否正确。
订阅时,订阅操作要求
unbounded
个元素。那么为什么事件:request(unbounded)
在管道中下降而不是上升?我的回答:unbounded
数量的请求上升到take
然后take
再次发送它。flatMap
发送cancel
信号。为什么take
不发送呢?
最后一个问题:输出的终端信号不止一个。这不是对反应流规范的评价吗?
那样的话,只会产生一个终端信号。
Flux.just("a", "b", "c", "d")
.log(null, Level.INFO, true) // line: 18
.flatMap(value ->
Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO, true) // line: 21
.take(3)
.log(null, Level.INFO, true) // line: 23
.subscribe(x ->
System.out.println("Thread: " + Thread.currentThread().getName() +
" , " + x), t -> {}, () -> System.out.println("Completed ""Only Once"));
这里棘手的部分是每个 Reactor 3 操作员都有自己的生命,他们都按照相同的规则进行游戏 - 发出 onComplete
以通知下游操作员不再有数据。
由于您有 .log()
运算符和三个不同的点,因此您将观察到来自 .just
、.flatMap
和 [=16= 的三个独立 onComplete
信号].
首先,您会从 .just
中看到 onComplete
,因为 .flatMap
的默认行为是“好的,我们先尝试请求 concurrency
个元素,然后让我们看看它是怎么回事',因为 .just
可能只产生(在你的情况下)4 个元素,在 2(在你的例子中是并发级别)请求的需求它将发出 2 onNext
并且在两个 request(1)
你会看到 onComplete
。反过来,emitted onComplete
让 .flatMap
知道当 4 个扁平流发出它们的 .onComplete
信号时,它将被允许向下游发出自己的 onComplete
。
反过来,下游是 .take(3)
运算符,它也在前三个元素之后发出自己的 onComplete
信号,而无需等待上游 onComplete
。由于在 .take
之后有 .log
运算符,此信号也将被记录。
最后,在你的流程中,你有 3 个独立的日志操作员,它们将记录来自 3 个独立操作员的 3 个独立 onComplete
,但尽管如此,最终终端 .subscribe
将只收到一个 onComplete
从第一个操作员到流程。
关于 .take
行为的小更新
.take
的中心思想是获取元素直到满足剩余计数。由于上游可能产生比请求更多的数据,我们需要有一种机制来防止发送更多数据。 Reactive-Streams 规范为我们提供的机制之一是 Subscription
上的协作。订阅有两种主要方法 - request
- 显示需求和 cancel
- 显示不再需要数据,即使请求的需求未得到满足。
在 .take
运算符的情况下,initial demand is Long.MAX_VALUE
, which considers as unbounded demand. Therefore, the only way to stop consuming potentially infinitive stream of data is to cancel 订阅,或者换句话说取消订阅
希望对你有帮助:)