WebFlux:如何使用 takeUntilOther() 方法?
WebFlux: how to work takeUntilOther() method?
谁能解释一下 takeUntilOther()
方法是如何工作的?我尝试 运行 以下代码,但它在我的控制台上没有任何显示。
Mono.just(10)
.subscribe();
Flux.range(1, 5)
.takeUntilOther(Mono.just(10))
.subscribe(System.out::println);
我不明白为什么。
基里尔,
我建议您参考 the project reactor's documentation 的适当部分。
takeUntilOther(Publisher<?> other)
Relay values from this Flux until the given Publisher emits.
意思是,在给定 Publisher<?> other
开始生成事件之前,您将从原始 Flux 接收值。在您的情况下,您有一个 hot publisher just()
会立即中断原始 Flux(通过调用 cancel()
方法)。
我再举一个例子。看看下面的代码片段:
Flux.range(1, 5) // produces elements from 1 to 5
.delayElements(Duration.ofSeconds(1)) // delays emission of each element from above for 1 second
.takeUntilOther(Mono
.just(10) // hot publisher. emits one element
// delays '10' for 3 seconds. meaning that it will only
// appears in the original Flux in 3 seconds
.delayElement(Duration.ofSeconds(3))
)
.subscribe(System.out::print);
输出为:
12
添加 Thread.sleep 等待主线程(或任何线程当前代码是 运行),以便订阅者线程将继续处理。这里是单元测试版本。
@Test
public void flux_Skip_Take_Based_On_Other_Streams() throws InterruptedException {
Flux.range(1, 100) // publisher with elements from 1-100
.delayElements(Duration.ofSeconds(1)) // Flux delay 1 sec before each element emit
.skipUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(10))) // skip the elements until inner Mono emits, i.e. for 10 seconds
.takeUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(70))) // take the elements until inner mono emits, i.e. till 70 seconds
.log()
.subscribe();
Thread.sleep(1000*100); //Sleep the main thread for 100 sec or more to verify the logs
}
输出:
2022-05-27 17:45:33.317 INFO 4180 --- [ main] reactor.Flux.TakeUntilOther.1 : onSubscribe(SerializedSubscriber)
2022-05-27 17:45:33.317 INFO 4180 --- [ main] reactor.Flux.TakeUntilOther.1 : request(unbounded)
2022-05-27 17:45:43.469 INFO 4180 --- [ parallel-12] reactor.Flux.TakeUntilOther.1 : onNext(10)
2022-05-27 17:45:44.485 INFO 4180 --- [ parallel-1] reactor.Flux.TakeUntilOther.1 : onNext(11)
.
.
.
2022-05-27 17:46:42.098 INFO 4180 --- [ parallel-10] reactor.Flux.TakeUntilOther.1 : onNext(68)
2022-05-27 17:46:43.103 INFO 4180 --- [ parallel-11] reactor.Flux.TakeUntilOther.1 : onNext(69)
2022-05-27 17:46:43.306 INFO 4180 --- [ parallel-1] reactor.Flux.TakeUntilOther.1 : onComplete()
谁能解释一下 takeUntilOther()
方法是如何工作的?我尝试 运行 以下代码,但它在我的控制台上没有任何显示。
Mono.just(10)
.subscribe();
Flux.range(1, 5)
.takeUntilOther(Mono.just(10))
.subscribe(System.out::println);
我不明白为什么。
基里尔,
我建议您参考 the project reactor's documentation 的适当部分。
takeUntilOther(Publisher<?> other)
Relay values from this Flux until the given Publisher emits.
意思是,在给定 Publisher<?> other
开始生成事件之前,您将从原始 Flux 接收值。在您的情况下,您有一个 hot publisher just()
会立即中断原始 Flux(通过调用 cancel()
方法)。
我再举一个例子。看看下面的代码片段:
Flux.range(1, 5) // produces elements from 1 to 5
.delayElements(Duration.ofSeconds(1)) // delays emission of each element from above for 1 second
.takeUntilOther(Mono
.just(10) // hot publisher. emits one element
// delays '10' for 3 seconds. meaning that it will only
// appears in the original Flux in 3 seconds
.delayElement(Duration.ofSeconds(3))
)
.subscribe(System.out::print);
输出为:
12
添加 Thread.sleep 等待主线程(或任何线程当前代码是 运行),以便订阅者线程将继续处理。这里是单元测试版本。
@Test
public void flux_Skip_Take_Based_On_Other_Streams() throws InterruptedException {
Flux.range(1, 100) // publisher with elements from 1-100
.delayElements(Duration.ofSeconds(1)) // Flux delay 1 sec before each element emit
.skipUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(10))) // skip the elements until inner Mono emits, i.e. for 10 seconds
.takeUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(70))) // take the elements until inner mono emits, i.e. till 70 seconds
.log()
.subscribe();
Thread.sleep(1000*100); //Sleep the main thread for 100 sec or more to verify the logs
}
输出:
2022-05-27 17:45:33.317 INFO 4180 --- [ main] reactor.Flux.TakeUntilOther.1 : onSubscribe(SerializedSubscriber)
2022-05-27 17:45:33.317 INFO 4180 --- [ main] reactor.Flux.TakeUntilOther.1 : request(unbounded)
2022-05-27 17:45:43.469 INFO 4180 --- [ parallel-12] reactor.Flux.TakeUntilOther.1 : onNext(10)
2022-05-27 17:45:44.485 INFO 4180 --- [ parallel-1] reactor.Flux.TakeUntilOther.1 : onNext(11)
.
.
.
2022-05-27 17:46:42.098 INFO 4180 --- [ parallel-10] reactor.Flux.TakeUntilOther.1 : onNext(68)
2022-05-27 17:46:43.103 INFO 4180 --- [ parallel-11] reactor.Flux.TakeUntilOther.1 : onNext(69)
2022-05-27 17:46:43.306 INFO 4180 --- [ parallel-1] reactor.Flux.TakeUntilOther.1 : onComplete()