Project Reactor 将一个发布者一分为二,至少有两个订阅者
Project Reactor split a publisher into two with a minimum of two subscribers
如何在 Reactor 中将发布者分成两部分,然后存在两个相同的数据流以在不同的流中处理下游?
所以我可以映射每个流并单独订阅每个流。
我在 API 中看不到任何暗示这是在 API 上的内容。
我需要等到两个订阅者都已启动并准备就绪后才能发布。
感谢您的输入,没有考虑清楚,当然只是有多个订阅者:
val flux = Flux.just("MyData1", "MyData2", "MyData3");
flux.doOnNext { println("Subscribing one$it") }.subscribe()
flux.doOnNext { println("Subscribing Two$it") }.subscribe()
将输出:
Subscribing oneMyData1
Subscribing oneMyData2
Subscribing oneMyData3
Subscribing TwoMyData1
Subscribing TwoMyData2
Subscribing TwoMyData3
如上所示,有 Share,但此 API 不允许设置最低订阅者数量,因此最好调用下面的函数,因为在我的情况下,我想等到我们有两个订户。文档指出
a Flux that upon first subscribe causes the source Flux to subscribe once, late subscribers might therefore miss items.
val flux = Flux.just("MyData1", "MyData2", "MyData3").publish().refCount(2)
这将导致以下输出,以确保在启动第二个订阅者时出现延迟时不会丢失消息。
Subscribing oneMyData1
Subscribing TwoMyData1
Subscribing oneMyData2
Subscribing TwoMyData2
Subscribing oneMyData3
Subscribing TwoMyData3
如何在 Reactor 中将发布者分成两部分,然后存在两个相同的数据流以在不同的流中处理下游?
所以我可以映射每个流并单独订阅每个流。
我在 API 中看不到任何暗示这是在 API 上的内容。
我需要等到两个订阅者都已启动并准备就绪后才能发布。
感谢您的输入,没有考虑清楚,当然只是有多个订阅者:
val flux = Flux.just("MyData1", "MyData2", "MyData3");
flux.doOnNext { println("Subscribing one$it") }.subscribe()
flux.doOnNext { println("Subscribing Two$it") }.subscribe()
将输出:
Subscribing oneMyData1
Subscribing oneMyData2
Subscribing oneMyData3
Subscribing TwoMyData1
Subscribing TwoMyData2
Subscribing TwoMyData3
如上所示,有 Share,但此 API 不允许设置最低订阅者数量,因此最好调用下面的函数,因为在我的情况下,我想等到我们有两个订户。文档指出
a Flux that upon first subscribe causes the source Flux to subscribe once, late subscribers might therefore miss items.
val flux = Flux.just("MyData1", "MyData2", "MyData3").publish().refCount(2)
这将导致以下输出,以确保在启动第二个订阅者时出现延迟时不会丢失消息。
Subscribing oneMyData1
Subscribing TwoMyData1
Subscribing oneMyData2
Subscribing TwoMyData2
Subscribing oneMyData3
Subscribing TwoMyData3