Reactor Flux 重播(int history)方法没有按预期工作
Reactor Flux replay(int history) method not working as expected
我正在尝试使用具有以下特征的 Project Reactor 创建一个 Flux
示例:
- 单个热可观察对象,每秒发出一个项目。
- 两个订阅者,每个订阅者都使用发布者的一个单独线程。
- 调用
replay()
时的历史记录有限,因此如果其中一位订阅者速度太慢,将会错过一些项目。
然后我对这个样本进行了编码:
import java.time.Duration;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxTest {
public static void main(String[] args) {
final ConnectableFlux<Integer> publisher = Flux.range(1, 20)
.delayElements(Duration.ofSeconds(1))
.replay(8);
publisher.publishOn(Schedulers.newSingle("fast"))
.subscribe(i -> {
System.out.println("Fast subscriber - Received " + i);
sleep(1);
});
publisher.publishOn(Schedulers.newSingle("slow"))
.subscribe(i -> {
System.out.println("Slow subscriber - Received " + i);
sleep(5);
});
publisher.connect();
}
private static void sleep(int seconds) {
try {
Thread.sleep(seconds * 1000L);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
根据 the documentation 的 replay(int history)
方法,我预计几秒钟后,第二个消费者(较慢的消费者)会开始丢失轨道,但事实并非如此。以控制台输出的这一部分为例:
...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 4
Fast subscriber - Received 16
Fast subscriber - Received 17
我预计慢速订阅者无法接收 4
,因为该元素不应再存在于历史记录中(15 - 8 = 7,这应该是最后一个)。
请注意,如果我使用 replay(8, Duration.ofSeconds(8))
方法,那么我会得到我期望的结果:
...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 8
Fast subscriber - Received 16
Fast subscriber - Received 17
我想我在这里遗漏了一些重要的东西,但我不知道它是什么。
replay(8)
能够 重播在 订阅者订阅 之前 发出的 8 个元素。对于之后出现的元素,它们将直接转发给订阅者。此处您在连接之前订阅 slow
,因此重播缓冲区的大小并不重要。
您的慢速订阅者在专用线程上休眠,所以发生的事情是 publishOn
确实接收了所有数据,将其放入内部 Queue
并在 [=11= 上安排自己] 线程排空该队列,在每次迭代中被阻塞 5 秒的排空循环中。
不过,操作员已经看到并能够处理所有数据。
我正在尝试使用具有以下特征的 Project Reactor 创建一个 Flux
示例:
- 单个热可观察对象,每秒发出一个项目。
- 两个订阅者,每个订阅者都使用发布者的一个单独线程。
- 调用
replay()
时的历史记录有限,因此如果其中一位订阅者速度太慢,将会错过一些项目。
然后我对这个样本进行了编码:
import java.time.Duration;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxTest {
public static void main(String[] args) {
final ConnectableFlux<Integer> publisher = Flux.range(1, 20)
.delayElements(Duration.ofSeconds(1))
.replay(8);
publisher.publishOn(Schedulers.newSingle("fast"))
.subscribe(i -> {
System.out.println("Fast subscriber - Received " + i);
sleep(1);
});
publisher.publishOn(Schedulers.newSingle("slow"))
.subscribe(i -> {
System.out.println("Slow subscriber - Received " + i);
sleep(5);
});
publisher.connect();
}
private static void sleep(int seconds) {
try {
Thread.sleep(seconds * 1000L);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
根据 the documentation 的 replay(int history)
方法,我预计几秒钟后,第二个消费者(较慢的消费者)会开始丢失轨道,但事实并非如此。以控制台输出的这一部分为例:
...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 4
Fast subscriber - Received 16
Fast subscriber - Received 17
我预计慢速订阅者无法接收 4
,因为该元素不应再存在于历史记录中(15 - 8 = 7,这应该是最后一个)。
请注意,如果我使用 replay(8, Duration.ofSeconds(8))
方法,那么我会得到我期望的结果:
...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 8
Fast subscriber - Received 16
Fast subscriber - Received 17
我想我在这里遗漏了一些重要的东西,但我不知道它是什么。
replay(8)
能够 重播在 订阅者订阅 之前 发出的 8 个元素。对于之后出现的元素,它们将直接转发给订阅者。此处您在连接之前订阅 slow
,因此重播缓冲区的大小并不重要。
您的慢速订阅者在专用线程上休眠,所以发生的事情是 publishOn
确实接收了所有数据,将其放入内部 Queue
并在 [=11= 上安排自己] 线程排空该队列,在每次迭代中被阻塞 5 秒的排空循环中。
不过,操作员已经看到并能够处理所有数据。