为什么 UnicastProcessor 加上 ConnectableFlux 在 autoConnect 上向下游发送先前发出的项目而不是在 connect()

Why does a UnicastProcessor plus ConnectableFlux send previously emitted items downstream on autoConnect but not on connect()

我有这个每秒发出一个数字的上游发布者:

private fun counter(emissionIntervalMillis: Long) =
        Flux.interval(Duration.ofMillis(emissionIntervalMillis))
                .map { it }.log()

考虑这个实现,其中 UnicastProcessor 订阅了前一个 Flux。此外还有一个 ConnectableFluxprocessor.publish().autoConnect() 生成。最后我订阅了这个ConnectableFlux:

    val latch = CountDownLatch(15)

    val numberGenerator: Flux<Long> = counter(1000)
    val processor = UnicastProcessor.create<Long>()
    numberGenerator.subscribeWith(processor)
    val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.publish().autoConnect()

    Thread.sleep(5000)

    connectableFlux.subscribe {
        logger.info("Element [{}]", it)
        latch.countDown()
    }

    latch.await()

日志:

15:58:26.941 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:58:26.967 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
15:58:26.969 [main] INFO reactor.Flux.Map.1 - request(unbounded)
15:58:27.973 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
15:58:28.973 [parallel-1] INFO reactor.Flux.Map.1 - onNext(1)
15:58:29.975 [parallel-1] INFO reactor.Flux.Map.1 - onNext(2)
15:58:30.974 [parallel-1] INFO reactor.Flux.Map.1 - onNext(3)
15:58:31.974 [parallel-1] INFO reactor.Flux.Map.1 - onNext(4)
subscribed!
15:58:31.979 [main] INFO com.codependent.processors.Tests - Element [0]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [1]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [2]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [3]
15:58:31.980 [main] INFO com.codependent.processors.Tests - Element [4]
15:58:32.972 [parallel-1] INFO reactor.Flux.Map.1 - onNext(5)
15:58:32.972 [parallel-1] INFO com.codependent.processors.Tests - Element [5]

如您所见,当 connectableFlux 有订阅者时,它会获取先前生成的项目,这些项目由 UnicastProcessor 缓存。我想这是 expected behaviour:

if you push any amount of data through it while its Subscriber has not yet requested data, it will buffer all of the data.

现在,我不再使用 autoConnect,而是使用 connect():

    val latch = CountDownLatch(15)

    val numberGenerator: Flux<Long> = counter(1000)
    val processor = UnicastProcessor.create<Long>()
    numberGenerator.subscribeWith(processor)
    val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.publish()
    connectableFlux.connect()

    Thread.sleep(5000)

    connectableFlux.subscribe {
        logger.info("Element [{}]", it)
        latch.countDown()
    } 

现在的结果完全不同,订阅者没有得到本应由 UnicastProcessor 缓存的项目。有人可以解释一下区别吗?

16:08:44.299 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:08:44.324 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
16:08:44.326 [main] INFO reactor.Flux.Map.1 - request(unbounded)
subscribed!
16:08:45.330 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
16:08:46.329 [parallel-1] INFO reactor.Flux.Map.1 - onNext(1)
16:08:47.329 [parallel-1] INFO reactor.Flux.Map.1 - onNext(2)
16:08:48.331 [parallel-1] INFO reactor.Flux.Map.1 - onNext(3)
16:08:49.330 [parallel-1] INFO reactor.Flux.Map.1 - onNext(4)
16:08:50.328 [parallel-1] INFO reactor.Flux.Map.1 - onNext(5)
16:08:50.328 [parallel-1] INFO com.codependent.processors.Tests - Element [5]
16:08:51.332 [parallel-1] INFO reactor.Flux.Map.1 - onNext(6)
16:08:51.332 [parallel-1] INFO com.codependent.processors.Tests - Element [6]

重新阅读文档后,我发现 autoConnect() 可以传递订阅上游所需的最少订阅者数量。改成autoConnect(0)connect()效果一样,不把之前的item传递给订阅者:

    val latch = CountDownLatch(15)

    val numberGenerator: Flux<Long> = counter(1000)
    val processor = UnicastProcessor.create<Long>()
    numberGenerator.subscribeWith(processor)
    val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.log().publish().autoConnect(0)

    Thread.sleep(5000)

    connectableFlux.subscribe {
        logger.info("Element [{}]", it)
        latch.countDown()
    }

    latch.await()

似乎由于 connectableFlux 已准备就绪(已连接),处理器收到 OnSubscribe 信号,并且由于 connectableFlux 没有任何实际订阅者,因此它丢弃了这些项目。

publish() 更改为 replay() 将使订阅者从头开始获取项目,如 stated in the doc.

val connectableFlux = processor.doOnSubscribe { println("subscribed!") }.log().replay().autoConnect(0)