如果创建热发布者,flux cache()、replay() 和 publish() 有什么区别?
What is the difference between flux cache(), replay() and publish() if creating a hot publisher?
如果创建热门发布者,flux cache()
、replay()
和 publish()
有什么区别?对于哪种用例,哪种运算符最适合?
以下示例重播了 3 种不同方法的所有 5 个元素。
cache()
:
var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
.delayElements(Duration.ofSeconds(1)).cache();
flux.doOnNext(v -> System.out.println("First: " + v))
.subscribe();
Thread.sleep(5000);
flux.doOnNext(v -> System.out.println("Second: " + v))
.subscribe();
Thread.sleep(10000);
replay()
:
var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
.delayElements(Duration.ofSeconds(1)).replay();
flux.doOnNext(v -> System.out.println("First: " + v))
.subscribe();
Thread.sleep(5000);
flux.doOnNext(v -> System.out.println("Second: " + v))
.subscribe();
flux.connect();
Thread.sleep(10000);
publish()
:
var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
.delayElements(Duration.ofSeconds(1)).publish();
flux.doOnNext(v -> System.out.println("First: " + v))
.subscribe();
Thread.sleep(5000);
flux.doOnNext(v -> System.out.println("Second: " + v))
.subscribe();
flux.connect();
Thread.sleep(10000);
打印结果的一种变化:
First: 1
First: 2
First: 3
First: 4
Second: 1
Second: 2
Second: 3
Second: 4
First: 5
Second: 5
cache()
是 .replay().autoConnect(1)
的方便别名,即。一旦第一个订阅者进来,它就会为你执行connect()
。
但是由于它重播了整个历史,第二个订阅者仍然可以看到所有元素。
根据您的 replay()
和 publish()
示例,您可能认为两者之间没有区别。但那是因为你 connect()
在两个订阅者都订阅之后...
如果您要在第二个订阅者之前移动 connect()
调用,您会发现在 publish()
的情况下它看不到任何值。另一方面,replay()
会 向第二个订阅者重播 整个历史记录,尽管已经晚了。
如果创建热门发布者,flux cache()
、replay()
和 publish()
有什么区别?对于哪种用例,哪种运算符最适合?
以下示例重播了 3 种不同方法的所有 5 个元素。
cache()
:
var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
.delayElements(Duration.ofSeconds(1)).cache();
flux.doOnNext(v -> System.out.println("First: " + v))
.subscribe();
Thread.sleep(5000);
flux.doOnNext(v -> System.out.println("Second: " + v))
.subscribe();
Thread.sleep(10000);
replay()
:
var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
.delayElements(Duration.ofSeconds(1)).replay();
flux.doOnNext(v -> System.out.println("First: " + v))
.subscribe();
Thread.sleep(5000);
flux.doOnNext(v -> System.out.println("Second: " + v))
.subscribe();
flux.connect();
Thread.sleep(10000);
publish()
:
var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
.delayElements(Duration.ofSeconds(1)).publish();
flux.doOnNext(v -> System.out.println("First: " + v))
.subscribe();
Thread.sleep(5000);
flux.doOnNext(v -> System.out.println("Second: " + v))
.subscribe();
flux.connect();
Thread.sleep(10000);
打印结果的一种变化:
First: 1
First: 2
First: 3
First: 4
Second: 1
Second: 2
Second: 3
Second: 4
First: 5
Second: 5
cache()
是 .replay().autoConnect(1)
的方便别名,即。一旦第一个订阅者进来,它就会为你执行connect()
。
但是由于它重播了整个历史,第二个订阅者仍然可以看到所有元素。
根据您的 replay()
和 publish()
示例,您可能认为两者之间没有区别。但那是因为你 connect()
在两个订阅者都订阅之后...
如果您要在第二个订阅者之前移动 connect()
调用,您会发现在 publish()
的情况下它看不到任何值。另一方面,replay()
会 向第二个订阅者重播 整个历史记录,尽管已经晚了。