我们可以多次发布流吗?

can we publish a stream more then once?

以下所有代码均不打印任何内容。为什么?

ConnectableFlux<Integer> publish = Flux.just(1)
        .publish();

ConnectableFlux<Integer> publish1 = Flux.just(2)
        .flatMap(x -> publish)
        .publish();

publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();

ConnectableFlux<Integer> publish1 = Flux.just(2)
        .publish()
        .publish();

publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();

ConnectableFlux<Integer> publish1 = Flux.just(2)
        .publish()
        .doOnNext(System.out::println)
        .publish();

publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();

不要忘记为每个 ConnectableFlux

提供一个 .connection

在所有这些示例中,都缺少 .connection 语句。

对于第一种情况,要使其正常工作,我们必须先 .connect 到第一个 publish ConnectableFlux

ConnectableFlux<Integer> publish = Flux.just(1)
        .publish();

ConnectableFlux<Integer> publish1 = Flux.just(2)
        .flatMap(x -> publish)
        .publish();

publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
publish.connect();

对于以下两个示例,我们有类似的东西。当我们使用 Flux.just(...).publish().publish() 时,我们创建了两个 ConnectableFlux。这里的问题是第一个被删除了。如果必须进行后续的 .publishing(这是非常不合逻辑的),我们可以使用以下技术来避免删除之前的 ConnectableFluxes:

ConnectableFlux<Integer> publish1 = Flux.just(2)
        .publish()
        .autoConnect() // or .autoConnect(0)
        .doOnNext(System.out::println)
        .publish();

publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();

在该示例中,我们使用 .autoConnect() 运算符,在 .autoConnect(0) 的情况下,它只是 ConnectableFlux#connectreturn this; 语句的组合。在 .autoConnect(>0) 的情况下,使用了一些对初始源的延迟订阅,这听起来像“当且仅当我们获得 N 订阅者时连接到初始源"