将定期操作绑定到 Mono 生命周期的最佳方法是什么?

What is the best way to bind a periodic action to the Mono lifecycle?

我想在 Mono 处于活动状态时每秒执行一次操作。执行此操作的最佳方法是什么?

这是一个可行的选项,但它似乎是一个解决方法:

// Some long life async action
Mono<String> asyncAction = Mono.delay(Duration.ofSeconds(60)).map(d -> "Hello");

Mono<String> periodicAction = Flux.interval(Duration.ofSeconds(1))
        .doOnNext(d -> {
            // Do something every second while the async action is running
        })
        .last()
        .flatMap(d -> Mono.never());

Mono.firstWithSignal(asyncAction, periodicAction)
        // Another logic
        .subscribe();

请考虑使用方法 zip 例如:

static <T1,T2> Flux<Tuple2<T1,T2>>  zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#zip-org.reactivestreams.Publisher-org.reactivestreams.Publisher-

示例:

        Flux.zip(
                Flux.interval(Duration.ofSeconds(1)).doOnNext(System.out::println).log(),
                Mono.delay(Duration.ofSeconds(5)).map(d -> "Hello").log()
        )
                .map(Tuple2::getT2)
                .log()
                .as(StepVerifier::create)
                .expectNextCount(1)
                .verifyComplete()

zip 方法 java 文档中我们可以读到:

Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a {@link Tuple2}. The operator will continue doing so until any of the sources completes.