DirectProcessor 和 UnicastProcessor 可以在不应该的时候订阅上游 Publisher。为什么?

DirectProcessor and UnicastProcessor can subscribe to upstream Publisher when they shouldn't. Why?

根据有关处理器的 Project Reactor 文档:

direct (DirectProcessor and UnicastProcessor): These processors can only push data through direct user action (calling their Sink's methods directly).

synchronous (EmitterProcessor and ReplayProcessor): These processors can push data both through user action and by subscribing to an upstream Publisher and synchronously draining it.

UnicastProcessor 应该无法订阅上游 Publisher。文档提供了直接用户 Sink 调用的示例:

UnicastProcessor<String> hotSource = UnicastProcessor.create();

Flux<String> hotFlux = hotSource.publish()
                                .autoConnect()
                                .map(String::toUpperCase);

hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.onNext("blue");

不过我已经尝试过直接将 UnicastProcessor 订阅到 Publisher 并且它有效。如文档中所述,这不应该是可能的。文档有误吗我是不是漏了什么?

在下面的示例中,我将 UnicastProcessor 订阅到上游 Flux 没有任何问题:

    val latch = CountDownLatch(20)

    val numberGenerator: Flux<Long> = counter(1000)
    val processor = UnicastProcessor.create<Long>()

    val connectableFlux = numberGenerator.subscribeWith(processor)

    connectableFlux.subscribe {
        logger.info("Element [{}]", it)
    }

    latch.await()

日志:

12:50:12.193 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
12:50:12.196 [main] INFO reactor.Flux.Map.1 - request(unbounded)
12:50:13.203 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
12:50:13.203 [parallel-1] INFO com.codependent.Test - Element [0]

是的,这方面的文档似乎已经过时了。甚至 DirectProcessor 也可以用作 Subscriber 并将信号传播给它自己的订阅者。

注意:您在代码段中使用了 EmitterProcessor,但它的行为仍然与 UnicastProcessor 相同。