如果创建热发布者,flux cache()、replay() 和 publish() 有什么区别?

What is the difference between flux cache(), replay() and publish() if creating a hot publisher?

如果创建热门发布者,flux cache()replay()publish() 有什么区别?对于哪种用例,哪种运算符最适合?

以下示例重播了 3 种不同方法的所有 5 个元素。

cache():

        var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
                .delayElements(Duration.ofSeconds(1)).cache();

        flux.doOnNext(v -> System.out.println("First: " + v))
        .subscribe();

        Thread.sleep(5000);

        flux.doOnNext(v -> System.out.println("Second: " + v))
                .subscribe();

        Thread.sleep(10000);

replay():

        var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
                .delayElements(Duration.ofSeconds(1)).replay();

        flux.doOnNext(v -> System.out.println("First: " + v))
        .subscribe();

        Thread.sleep(5000);

        flux.doOnNext(v -> System.out.println("Second: " + v))
                .subscribe();

        flux.connect();

        Thread.sleep(10000);

publish():

        var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
                .delayElements(Duration.ofSeconds(1)).publish();

        flux.doOnNext(v -> System.out.println("First: " + v))
        .subscribe();

        Thread.sleep(5000);

        flux.doOnNext(v -> System.out.println("Second: " + v))
                .subscribe();

        flux.connect();

        Thread.sleep(10000);

打印结果的一种变化:

First: 1
First: 2
First: 3
First: 4
Second: 1
Second: 2
Second: 3
Second: 4
First: 5
Second: 5

cache().replay().autoConnect(1) 的方便别名,即。一旦第一个订阅者进来,它就会为你执行connect()

但是由于它重播了整个历史,第二个订阅者仍然可以看到所有元素。

根据您的 replay()publish() 示例,您可能认为两者之间没有区别。但那是因为你 connect() 在两个订阅者都订阅之后...

如果您要在第二个订阅者之前移动 connect() 调用,您会发现在 publish() 的情况下它看不到任何值。另一方面,replay() 向第二个订阅者重播 整个历史记录,尽管已经晚了。