WebFlux:如何使用 takeUntilOther() 方法?

WebFlux: how to work takeUntilOther() method?

谁能解释一下 takeUntilOther() 方法是如何工作的?我尝试 运行 以下代码,但它在我的控制台上没有任何显示。

     Mono.just(10)
                .subscribe();

        Flux.range(1, 5)
                .takeUntilOther(Mono.just(10))
                .subscribe(System.out::println);

我不明白为什么。

基里尔,

我建议您参考 the project reactor's documentation 的适当部分。

takeUntilOther(Publisher<?> other) Relay values from this Flux until the given Publisher emits.

意思是,在给定 Publisher<?> other 开始生成事件之前,您将从原始 Flux 接收值。在您的情况下,您有一个 hot publisher just() 会立即中断原始 Flux(通过调用 cancel() 方法)。

我再举一个例子。看看下面的代码片段:

Flux.range(1, 5) // produces elements from 1 to 5
        .delayElements(Duration.ofSeconds(1)) // delays emission of each element from above for 1 second
        .takeUntilOther(Mono
                .just(10) // hot publisher. emits one element

                // delays '10' for 3 seconds. meaning that it will only 
                // appears in the original Flux in 3 seconds
                .delayElement(Duration.ofSeconds(3)) 
        )
        .subscribe(System.out::print);

输出为:

12

添加 Thread.sleep 等待主线程(或任何线程当前代码是 运行),以便订阅者线程将继续处理。这里是单元测试版本。

    @Test
    public void flux_Skip_Take_Based_On_Other_Streams() throws InterruptedException {
        
        Flux.range(1, 100) // publisher with elements from 1-100
                .delayElements(Duration.ofSeconds(1)) // Flux delay 1 sec before each element emit
                .skipUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(10))) // skip the elements until inner Mono emits, i.e.  for 10 seconds
                .takeUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(70))) // take the elements until inner mono emits, i.e. till 70 seconds
                .log()
                .subscribe();
    
        Thread.sleep(1000*100); //Sleep the main thread for 100 sec or more to verify the logs
}

输出:

2022-05-27 17:45:33.317  INFO 4180 --- [           main] reactor.Flux.TakeUntilOther.1            : onSubscribe(SerializedSubscriber)
2022-05-27 17:45:33.317  INFO 4180 --- [           main] reactor.Flux.TakeUntilOther.1            : request(unbounded)
2022-05-27 17:45:43.469  INFO 4180 --- [    parallel-12] reactor.Flux.TakeUntilOther.1            : onNext(10)
2022-05-27 17:45:44.485  INFO 4180 --- [     parallel-1] reactor.Flux.TakeUntilOther.1            : onNext(11)
.
.
.
2022-05-27 17:46:42.098  INFO 4180 --- [    parallel-10] reactor.Flux.TakeUntilOther.1            : onNext(68)
2022-05-27 17:46:43.103  INFO 4180 --- [    parallel-11] reactor.Flux.TakeUntilOther.1            : onNext(69)
2022-05-27 17:46:43.306  INFO 4180 --- [     parallel-1] reactor.Flux.TakeUntilOther.1            : onComplete()