DirectProcessor 和 UnicastProcessor 可以在不应该的时候订阅上游 Publisher。为什么?
DirectProcessor and UnicastProcessor can subscribe to upstream Publisher when they shouldn't. Why?
根据有关处理器的 Project Reactor 文档:
direct (DirectProcessor and UnicastProcessor): These processors can
only push data through direct user action (calling their Sink's
methods directly).
synchronous (EmitterProcessor and ReplayProcessor): These processors
can push data both through user action and by subscribing to an
upstream Publisher and synchronously draining it.
UnicastProcessor
应该无法订阅上游 Publisher
。文档提供了直接用户 Sink 调用的示例:
UnicastProcessor<String> hotSource = UnicastProcessor.create();
Flux<String> hotFlux = hotSource.publish()
.autoConnect()
.map(String::toUpperCase);
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("blue");
不过我已经尝试过直接将 UnicastProcessor
订阅到 Publisher
并且它有效。如文档中所述,这不应该是可能的。文档有误吗我是不是漏了什么?
在下面的示例中,我将 UnicastProcessor
订阅到上游 Flux
没有任何问题:
val latch = CountDownLatch(20)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
val connectableFlux = numberGenerator.subscribeWith(processor)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
}
latch.await()
日志:
12:50:12.193 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
12:50:12.196 [main] INFO reactor.Flux.Map.1 - request(unbounded)
12:50:13.203 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
12:50:13.203 [parallel-1] INFO com.codependent.Test - Element [0]
是的,这方面的文档似乎已经过时了。甚至 DirectProcessor
也可以用作 Subscriber
并将信号传播给它自己的订阅者。
注意:您在代码段中使用了 EmitterProcessor
,但它的行为仍然与 UnicastProcessor
相同。
根据有关处理器的 Project Reactor 文档:
direct (DirectProcessor and UnicastProcessor): These processors can only push data through direct user action (calling their Sink's methods directly).
synchronous (EmitterProcessor and ReplayProcessor): These processors can push data both through user action and by subscribing to an upstream Publisher and synchronously draining it.
UnicastProcessor
应该无法订阅上游 Publisher
。文档提供了直接用户 Sink 调用的示例:
UnicastProcessor<String> hotSource = UnicastProcessor.create();
Flux<String> hotFlux = hotSource.publish()
.autoConnect()
.map(String::toUpperCase);
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("blue");
不过我已经尝试过直接将 UnicastProcessor
订阅到 Publisher
并且它有效。如文档中所述,这不应该是可能的。文档有误吗我是不是漏了什么?
在下面的示例中,我将 UnicastProcessor
订阅到上游 Flux
没有任何问题:
val latch = CountDownLatch(20)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
val connectableFlux = numberGenerator.subscribeWith(processor)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
}
latch.await()
日志:
12:50:12.193 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
12:50:12.196 [main] INFO reactor.Flux.Map.1 - request(unbounded)
12:50:13.203 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
12:50:13.203 [parallel-1] INFO com.codependent.Test - Element [0]
是的,这方面的文档似乎已经过时了。甚至 DirectProcessor
也可以用作 Subscriber
并将信号传播给它自己的订阅者。
注意:您在代码段中使用了 EmitterProcessor
,但它的行为仍然与 UnicastProcessor
相同。