为什么 UnicastProcessor 加上 ConnectableFlux 在 autoConnect 上向下游发送先前发出的项目而不是在 connect()
Why does a UnicastProcessor plus ConnectableFlux send previously emitted items downstream on autoConnect but not on connect()
我有这个每秒发出一个数字的上游发布者:
private fun counter(emissionIntervalMillis: Long) =
Flux.interval(Duration.ofMillis(emissionIntervalMillis))
.map { it }.log()
考虑这个实现,其中 UnicastProcessor
订阅了前一个 Flux
。此外还有一个 ConnectableFlux
由 processor.publish().autoConnect()
生成。最后我订阅了这个ConnectableFlux
:
val latch = CountDownLatch(15)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
numberGenerator.subscribeWith(processor)
val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.publish().autoConnect()
Thread.sleep(5000)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
latch.countDown()
}
latch.await()
日志:
15:58:26.941 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:58:26.967 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
15:58:26.969 [main] INFO reactor.Flux.Map.1 - request(unbounded)
15:58:27.973 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
15:58:28.973 [parallel-1] INFO reactor.Flux.Map.1 - onNext(1)
15:58:29.975 [parallel-1] INFO reactor.Flux.Map.1 - onNext(2)
15:58:30.974 [parallel-1] INFO reactor.Flux.Map.1 - onNext(3)
15:58:31.974 [parallel-1] INFO reactor.Flux.Map.1 - onNext(4)
subscribed!
15:58:31.979 [main] INFO com.codependent.processors.Tests - Element [0]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [1]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [2]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [3]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [4]
15:58:32.972 [parallel-1] INFO reactor.Flux.Map.1 - onNext(5)
15:58:32.972 [parallel-1] INFO com.codependent.processors.Tests - Element [5]
如您所见,当 connectableFlux 有订阅者时,它会获取先前生成的项目,这些项目由 UnicastProcessor 缓存。我想这是 expected behaviour:
if you push any amount of data through it while its Subscriber has not
yet requested data, it will buffer all of the data.
现在,我不再使用 autoConnect
,而是使用 connect()
:
val latch = CountDownLatch(15)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
numberGenerator.subscribeWith(processor)
val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.publish()
connectableFlux.connect()
Thread.sleep(5000)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
latch.countDown()
}
现在的结果完全不同,订阅者没有得到本应由 UnicastProcessor
缓存的项目。有人可以解释一下区别吗?
16:08:44.299 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:08:44.324 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
16:08:44.326 [main] INFO reactor.Flux.Map.1 - request(unbounded)
subscribed!
16:08:45.330 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
16:08:46.329 [parallel-1] INFO reactor.Flux.Map.1 - onNext(1)
16:08:47.329 [parallel-1] INFO reactor.Flux.Map.1 - onNext(2)
16:08:48.331 [parallel-1] INFO reactor.Flux.Map.1 - onNext(3)
16:08:49.330 [parallel-1] INFO reactor.Flux.Map.1 - onNext(4)
16:08:50.328 [parallel-1] INFO reactor.Flux.Map.1 - onNext(5)
16:08:50.328 [parallel-1] INFO com.codependent.processors.Tests - Element [5]
16:08:51.332 [parallel-1] INFO reactor.Flux.Map.1 - onNext(6)
16:08:51.332 [parallel-1] INFO com.codependent.processors.Tests - Element [6]
重新阅读文档后,我发现 autoConnect()
可以传递订阅上游所需的最少订阅者数量。改成autoConnect(0)
和connect()
效果一样,不把之前的item传递给订阅者:
val latch = CountDownLatch(15)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
numberGenerator.subscribeWith(processor)
val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.log().publish().autoConnect(0)
Thread.sleep(5000)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
latch.countDown()
}
latch.await()
似乎由于 connectableFlux 已准备就绪(已连接),处理器收到 OnSubscribe 信号,并且由于 connectableFlux 没有任何实际订阅者,因此它丢弃了这些项目。
将 publish()
更改为 replay()
将使订阅者从头开始获取项目,如 stated in the doc.
val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.log().replay().autoConnect(0)
我有这个每秒发出一个数字的上游发布者:
private fun counter(emissionIntervalMillis: Long) =
Flux.interval(Duration.ofMillis(emissionIntervalMillis))
.map { it }.log()
考虑这个实现,其中 UnicastProcessor
订阅了前一个 Flux
。此外还有一个 ConnectableFlux
由 processor.publish().autoConnect()
生成。最后我订阅了这个ConnectableFlux
:
val latch = CountDownLatch(15)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
numberGenerator.subscribeWith(processor)
val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.publish().autoConnect()
Thread.sleep(5000)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
latch.countDown()
}
latch.await()
日志:
15:58:26.941 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:58:26.967 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
15:58:26.969 [main] INFO reactor.Flux.Map.1 - request(unbounded)
15:58:27.973 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
15:58:28.973 [parallel-1] INFO reactor.Flux.Map.1 - onNext(1)
15:58:29.975 [parallel-1] INFO reactor.Flux.Map.1 - onNext(2)
15:58:30.974 [parallel-1] INFO reactor.Flux.Map.1 - onNext(3)
15:58:31.974 [parallel-1] INFO reactor.Flux.Map.1 - onNext(4)
subscribed!
15:58:31.979 [main] INFO com.codependent.processors.Tests - Element [0]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [1]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [2]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [3]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [4]
15:58:32.972 [parallel-1] INFO reactor.Flux.Map.1 - onNext(5)
15:58:32.972 [parallel-1] INFO com.codependent.processors.Tests - Element [5]
如您所见,当 connectableFlux 有订阅者时,它会获取先前生成的项目,这些项目由 UnicastProcessor 缓存。我想这是 expected behaviour:
if you push any amount of data through it while its Subscriber has not yet requested data, it will buffer all of the data.
现在,我不再使用 autoConnect
,而是使用 connect()
:
val latch = CountDownLatch(15)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
numberGenerator.subscribeWith(processor)
val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.publish()
connectableFlux.connect()
Thread.sleep(5000)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
latch.countDown()
}
现在的结果完全不同,订阅者没有得到本应由 UnicastProcessor
缓存的项目。有人可以解释一下区别吗?
16:08:44.299 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:08:44.324 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
16:08:44.326 [main] INFO reactor.Flux.Map.1 - request(unbounded)
subscribed!
16:08:45.330 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
16:08:46.329 [parallel-1] INFO reactor.Flux.Map.1 - onNext(1)
16:08:47.329 [parallel-1] INFO reactor.Flux.Map.1 - onNext(2)
16:08:48.331 [parallel-1] INFO reactor.Flux.Map.1 - onNext(3)
16:08:49.330 [parallel-1] INFO reactor.Flux.Map.1 - onNext(4)
16:08:50.328 [parallel-1] INFO reactor.Flux.Map.1 - onNext(5)
16:08:50.328 [parallel-1] INFO com.codependent.processors.Tests - Element [5]
16:08:51.332 [parallel-1] INFO reactor.Flux.Map.1 - onNext(6)
16:08:51.332 [parallel-1] INFO com.codependent.processors.Tests - Element [6]
重新阅读文档后,我发现 autoConnect()
可以传递订阅上游所需的最少订阅者数量。改成autoConnect(0)
和connect()
效果一样,不把之前的item传递给订阅者:
val latch = CountDownLatch(15)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
numberGenerator.subscribeWith(processor)
val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.log().publish().autoConnect(0)
Thread.sleep(5000)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
latch.countDown()
}
latch.await()
似乎由于 connectableFlux 已准备就绪(已连接),处理器收到 OnSubscribe 信号,并且由于 connectableFlux 没有任何实际订阅者,因此它丢弃了这些项目。
将 publish()
更改为 replay()
将使订阅者从头开始获取项目,如 stated in the doc.
val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.log().replay().autoConnect(0)