Reactor信号的基本问题

Basic questions about Reactor signals

我对以下代码的输出有一些疑问:

Flux.just("a", "b", "c", "d")
        .log(null, Level.INFO, true) // line: 18
        .flatMap(value ->
                Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
        .log(null, Level.INFO, true) // line: 21
        .take(3)
        .log(null, Level.INFO, true) // line: 23
        .subscribe(x -> 
             System.out.println("Thread: " + Thread.currentThread().getName() +
                               " , " + x));

Thread.sleep(1000 * 1000);

输出:

1. 11:29:11 [main] INFO  - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)    Flux.log(App.java:18)
2. 11:29:11 [main] INFO  - onSubscribe(FluxFlatMap.FlatMapMain)     Flux.log(App.java:21)
3. 11:29:11 [main] INFO  - onSubscribe(FluxTake.TakeSubscriber)     Flux.log(App.java:23)
4. 11:29:11 [main] INFO  - request(unbounded)   Flux.log(App.java:23)
5. 11:29:11 [main] INFO  - request(unbounded)   Flux.log(App.java:21)
6. 11:29:11 [main] INFO  - | request(2)     Flux.log(App.java:18)
7. 11:29:11 [main] INFO  - | onNext(a)  Flux.log(App.java:18)
8. 11:29:11 [main] INFO  - | onNext(b)  Flux.log(App.java:18)
9. 11:29:11 [elastic-2] INFO  - onNext(A)   Flux.log(App.java:21)
10. 11:29:11 [elastic-2] INFO  - onNext(A)  Flux.log(App.java:23)
11. Thread: elastic-2 , A
12. 11:29:11 [elastic-2] INFO  - | request(1)   Flux.log(App.java:18)
13. 11:29:11 [main] INFO  - | onNext(c)     Flux.log(App.java:18)
14. 11:29:11 [elastic-3] INFO  - onNext(B)  Flux.log(App.java:21)
15. 11:29:11 [elastic-3] INFO  - onNext(B)  Flux.log(App.java:23)
16. Thread: elastic-3 , B
17. 11:29:11 [elastic-3] INFO  - | request(1)   Flux.log(App.java:18)
18. 11:29:11 [elastic-3] INFO  - | onNext(d)    Flux.log(App.java:18)
19. 11:29:11 [elastic-3] INFO  - | onComplete()     Flux.log(App.java:18)
20. 11:29:11 [elastic-3] INFO  - onNext(C)  Flux.log(App.java:21)
21. 11:29:11 [elastic-3] INFO  - onNext(C)  Flux.log(App.java:23)
22. Thread: elastic-3 , C
23. 11:29:11 [elastic-3] INFO  - cancel()   Flux.log(App.java:21)
24. 11:29:11 [elastic-3] INFO  - onComplete()   Flux.log(App.java:23)
25. 11:29:11 [elastic-3] INFO  - | cancel()     Flux.log(App.java:18)

问题:每个问题都是关于输出中的特定行(而不是代码中的一行)。我还添加了我对其中一些答案的回答,但我不确定我是否正确。

  1. 订阅时,订阅操作要求unbounded个元素。那么为什么事件:request(unbounded) 在管道中下降而不是上升?我的回答:unbounded 数量的请求上升到 take 然后 take 再次发送它。

  2. flatMap发送cancel信号。为什么 take 不发送呢?

最后一个问题:输出的终端信号不止一个。这不是对反应流规范的评价吗?

那样的话,只会产生一个终端信号。

Flux.just("a", "b", "c", "d")
            .log(null, Level.INFO, true) // line: 18
            .flatMap(value ->
                    Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
            .log(null, Level.INFO, true) // line: 21
            .take(3)
            .log(null, Level.INFO, true) // line: 23
            .subscribe(x ->
                    System.out.println("Thread: " + Thread.currentThread().getName() +
 " , " + x), t -> {}, () -> System.out.println("Completed ""Only Once"));

这里棘手的部分是每个 Reactor 3 操作员都有自己的生命,他们都按照相同的规则进行游戏 - 发出 onComplete 以通知下游操作员不再有数据。

由于您有 .log() 运算符和三个不同的点,因此您将观察到来自 .just.flatMap 和 [=16= 的三个独立 onComplete 信号].

首先,您会从 .just 中看到 onComplete,因为 .flatMap 的默认行为是“好的,我们先尝试请求 concurrency 个元素,然后让我们看看它是怎么回事',因为 .just 可能只产生(在你的情况下)4 个元素,在 2(在你的例子中是并发级别)请求的需求它将发出 2 onNext 并且在两个 request(1) 你会看到 onComplete。反过来,emitted onComplete.flatMap 知道当 4 个扁平流发出它们的 .onComplete 信号时,它将被允许向下游发出自己的 onComplete。 反过来,下游是 .take(3) 运算符,它也在前三个元素之后发出自己的 onComplete 信号,而无需等待上游 onComplete。由于在 .take 之后有 .log 运算符,此信号也将被记录。 最后,在你的流程中,你有 3 个独立的日志操作员,它们将记录来自 3 个独立操作员的 3 个独立 onComplete,但尽管如此,最终终端 .subscribe 将只收到一个 onComplete从第一个操作员到流程。

关于 .take 行为的小更新

.take的中心思想是获取元素直到满足剩余计数。由于上游可能产生比请求更多的数据,我们需要有一种机制来防止发送更多数据。 Reactive-Streams 规范为我们提供的机制之一是 Subscription 上的协作。订阅有两种主要方法 - request - 显示需求和 cancel - 显示不再需要数据,即使请求的需求未得到满足。 在 .take 运算符的情况下,initial demand is Long.MAX_VALUE, which considers as unbounded demand. Therefore, the only way to stop consuming potentially infinitive stream of data is to cancel 订阅,或者换句话说取消订阅

希望对你有帮助:)