等待订阅者订阅的给定(可重置)时间

Wait given (reset-able) time for subscribers to subscribe

在反应堆中有一种方法可以等待特定数量的订阅者,然后发布者才应该开始发射。

我想实现一个稍微不同的场景:

如果等待周期持续时间过长(长于时间 t2),则发布者无论如何都会开始发送。

怎么做?

没有 baked-in 运算符可以做到这一点,但您应该能够使用标准 publish()-connect() 通过计划任务和 AtomicInteger 来模拟它周期:

@Test
public void deferredConnect() throws InterruptedException {
    ConnectableFlux<Integer> connectableFlux = Flux.range(1, 10)
        .publish();
    AtomicInteger subCount = new AtomicInteger();

    Flux<Integer> deferredConnect = connectableFlux
        .doOnSubscribe(sub -> {
            int current = subCount.incrementAndGet();
            Schedulers.parallel().schedule(() -> {
                if (subCount.compareAndSet(current, -1)) {
                    connectableFlux.connect();
                }
            }, 1, TimeUnit.SECONDS);
        });

    deferredConnect.subscribe(v -> System.out.println("1: " + v));
    Thread.sleep(500);
    deferredConnect.subscribe(v -> System.out.println("2: " + v));
    Thread.sleep(400);
    deferredConnect.subscribe(v -> System.out.println("3: " + v));
    Thread.sleep(200);
    assertThat(subCount).hasNonNegativeValue();
    Thread.sleep(800);
    assertThat(subCount).hasNegativeValue();
}