Reactor Flux 重播(int history)方法没有按预期工作

Reactor Flux replay(int history) method not working as expected

我正在尝试使用具有以下特征的 Project Reactor 创建一个 Flux 示例:

然后我对这个样本进行了编码:

import java.time.Duration;

import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class FluxTest {

  public static void main(String[] args) {
    final ConnectableFlux<Integer> publisher = Flux.range(1, 20)
      .delayElements(Duration.ofSeconds(1))
      .replay(8);

    publisher.publishOn(Schedulers.newSingle("fast"))
      .subscribe(i -> {
        System.out.println("Fast subscriber - Received " + i);
        sleep(1);
      });

    publisher.publishOn(Schedulers.newSingle("slow"))
      .subscribe(i -> {
        System.out.println("Slow subscriber - Received " + i);
        sleep(5);
      });

    publisher.connect();
  }

  private static void sleep(int seconds) {
    try {
      Thread.sleep(seconds * 1000L);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

根据 the documentationreplay(int history) 方法,我预计几秒钟后,第二个消费者(较慢的消费者)会开始丢失轨道,但事实并非如此。以控制台输出的这一部分为例:

...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 4
Fast subscriber - Received 16
Fast subscriber - Received 17

我预计慢速订阅者无法接收 4,因为该元素不应再存在于历史记录中(15 - 8 = 7,这应该是最后一个)。

请注意,如果我使用 replay(8, Duration.ofSeconds(8)) 方法,那么我会得到我期望的结果:

...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 8
Fast subscriber - Received 16
Fast subscriber - Received 17

我想我在这里遗漏了一些重要的东西,但我不知道它是什么。

replay(8) 能够 重播在 订阅者订阅 之前 发出的 8 个元素。对于之后出现的元素,它们将直接转发给订阅者。此处您在连接之前订阅 slow,因此重播缓冲区的大小并不重要。

您的慢速订阅者在专用线程上休眠,所以发生的事情是 publishOn 确实接收了所有数据,将其放入内部 Queue 并在 [=11= 上安排自己] 线程排空该队列,在每次迭代中被阻塞 5 秒的排空循环中。

不过,操作员已经看到并能够处理所有数据。