我们可以多次发布流吗?
can we publish a stream more then once?
以下所有代码均不打印任何内容。为什么?
ConnectableFlux<Integer> publish = Flux.just(1)
.publish();
ConnectableFlux<Integer> publish1 = Flux.just(2)
.flatMap(x -> publish)
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
ConnectableFlux<Integer> publish1 = Flux.just(2)
.publish()
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
ConnectableFlux<Integer> publish1 = Flux.just(2)
.publish()
.doOnNext(System.out::println)
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
不要忘记为每个 ConnectableFlux
提供一个 .connect
ion
在所有这些示例中,都缺少 .connect
ion 语句。
对于第一种情况,要使其正常工作,我们必须先 .connect
到第一个 publish
ConnectableFlux
:
ConnectableFlux<Integer> publish = Flux.just(1)
.publish();
ConnectableFlux<Integer> publish1 = Flux.just(2)
.flatMap(x -> publish)
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
publish.connect();
对于以下两个示例,我们有类似的东西。当我们使用 Flux.just(...).publish().publish()
时,我们创建了两个 ConnectableFlux
。这里的问题是第一个被删除了。如果必须进行后续的 .publish
ing(这是非常不合逻辑的),我们可以使用以下技术来避免删除之前的 ConnectableFlux
es:
ConnectableFlux<Integer> publish1 = Flux.just(2)
.publish()
.autoConnect() // or .autoConnect(0)
.doOnNext(System.out::println)
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
在该示例中,我们使用 .autoConnect()
运算符,在 .autoConnect(0)
的情况下,它只是 ConnectableFlux#connect
和 return this;
语句的组合。在 .autoConnect(>0)
的情况下,使用了一些对初始源的延迟订阅,这听起来像“当且仅当我们获得 N 订阅者时连接到初始源"
以下所有代码均不打印任何内容。为什么?
ConnectableFlux<Integer> publish = Flux.just(1)
.publish();
ConnectableFlux<Integer> publish1 = Flux.just(2)
.flatMap(x -> publish)
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
ConnectableFlux<Integer> publish1 = Flux.just(2)
.publish()
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
ConnectableFlux<Integer> publish1 = Flux.just(2)
.publish()
.doOnNext(System.out::println)
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
不要忘记为每个 ConnectableFlux
提供一个 .connect
ion
在所有这些示例中,都缺少 .connect
ion 语句。
对于第一种情况,要使其正常工作,我们必须先 .connect
到第一个 publish
ConnectableFlux
:
ConnectableFlux<Integer> publish = Flux.just(1)
.publish();
ConnectableFlux<Integer> publish1 = Flux.just(2)
.flatMap(x -> publish)
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
publish.connect();
对于以下两个示例,我们有类似的东西。当我们使用 Flux.just(...).publish().publish()
时,我们创建了两个 ConnectableFlux
。这里的问题是第一个被删除了。如果必须进行后续的 .publish
ing(这是非常不合逻辑的),我们可以使用以下技术来避免删除之前的 ConnectableFlux
es:
ConnectableFlux<Integer> publish1 = Flux.just(2)
.publish()
.autoConnect() // or .autoConnect(0)
.doOnNext(System.out::println)
.publish();
publish1.subscribe(System.out::println, System.out::println, System.out::println);
publish1.connect();
在该示例中,我们使用 .autoConnect()
运算符,在 .autoConnect(0)
的情况下,它只是 ConnectableFlux#connect
和 return this;
语句的组合。在 .autoConnect(>0)
的情况下,使用了一些对初始源的延迟订阅,这听起来像“当且仅当我们获得 N 订阅者时连接到初始源"