等待订阅者订阅的给定(可重置)时间
Wait given (reset-able) time for subscribers to subscribe
在反应堆中有一种方法可以等待特定数量的订阅者,然后发布者才应该开始发射。
我想实现一个稍微不同的场景:
- 订阅者订阅
- 发布者等待另一个订阅
- 订阅时间少于预期时间 t
- 计时器重置,发布者继续等待另一个订阅者
- 给定时间内没有订阅
- 发布者开始 emmit
如果等待周期持续时间过长(长于时间 t2),则发布者无论如何都会开始发送。
怎么做?
没有 baked-in 运算符可以做到这一点,但您应该能够使用标准 publish()
-connect()
通过计划任务和 AtomicInteger
来模拟它周期:
@Test
public void deferredConnect() throws InterruptedException {
ConnectableFlux<Integer> connectableFlux = Flux.range(1, 10)
.publish();
AtomicInteger subCount = new AtomicInteger();
Flux<Integer> deferredConnect = connectableFlux
.doOnSubscribe(sub -> {
int current = subCount.incrementAndGet();
Schedulers.parallel().schedule(() -> {
if (subCount.compareAndSet(current, -1)) {
connectableFlux.connect();
}
}, 1, TimeUnit.SECONDS);
});
deferredConnect.subscribe(v -> System.out.println("1: " + v));
Thread.sleep(500);
deferredConnect.subscribe(v -> System.out.println("2: " + v));
Thread.sleep(400);
deferredConnect.subscribe(v -> System.out.println("3: " + v));
Thread.sleep(200);
assertThat(subCount).hasNonNegativeValue();
Thread.sleep(800);
assertThat(subCount).hasNegativeValue();
}
在反应堆中有一种方法可以等待特定数量的订阅者,然后发布者才应该开始发射。
我想实现一个稍微不同的场景:
- 订阅者订阅
- 发布者等待另一个订阅
- 订阅时间少于预期时间 t
- 计时器重置,发布者继续等待另一个订阅者
- 给定时间内没有订阅
- 发布者开始 emmit
如果等待周期持续时间过长(长于时间 t2),则发布者无论如何都会开始发送。
怎么做?
没有 baked-in 运算符可以做到这一点,但您应该能够使用标准 publish()
-connect()
通过计划任务和 AtomicInteger
来模拟它周期:
@Test
public void deferredConnect() throws InterruptedException {
ConnectableFlux<Integer> connectableFlux = Flux.range(1, 10)
.publish();
AtomicInteger subCount = new AtomicInteger();
Flux<Integer> deferredConnect = connectableFlux
.doOnSubscribe(sub -> {
int current = subCount.incrementAndGet();
Schedulers.parallel().schedule(() -> {
if (subCount.compareAndSet(current, -1)) {
connectableFlux.connect();
}
}, 1, TimeUnit.SECONDS);
});
deferredConnect.subscribe(v -> System.out.println("1: " + v));
Thread.sleep(500);
deferredConnect.subscribe(v -> System.out.println("2: " + v));
Thread.sleep(400);
deferredConnect.subscribe(v -> System.out.println("3: " + v));
Thread.sleep(200);
assertThat(subCount).hasNonNegativeValue();
Thread.sleep(800);
assertThat(subCount).hasNegativeValue();
}