如何将 delayElements() 与 Flux 合并一起使用?
How to use delayElements() with Flux merge?
我正在学习教程,我相信我的代码与讲师的代码相同,但我不明白为什么 delayElements() 不起作用。
调用方法如下:
public static void main(String[] args) {
FluxAndMonoGeneratorService fluxAndMonoGeneratorService = new FluxAndMonoGeneratorService();
fluxAndMonoGeneratorService.explore_merge()
.doOnComplete(() -> System.out.println("Completed !"))
.onErrorReturn("asdasd")
.subscribe(System.out::println);
}
如果我把没有延迟元素的方法写成:
public Flux<String> explore_merge() {
Flux<String> abcFlux = Flux.just("A", "B", "C");
Flux<String> defFlux = Flux.just("D", "E", "F");
return Flux.merge(abcFlux, defFlux);
}
然后控制台中的输出是(如预期的):
00:53:19.443 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
A
B
C
D
E
F
Completed !
BUILD SUCCESSFUL in 1s
但我想使用 delayElements() 来测试 merge() 方法:
public Flux<String> explore_merge() {
Flux<String> abcFlux = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(151));
Flux<String> defFlux = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(100));
return Flux.merge(abcFlux, defFlux);
}
没有任何反应,onComplete 和 onErrorReturn 都没有,输出什么也没有:
0:55:22: Executing ':reactive-programming-using-reactor:FluxAndMonoGeneratorService.main()'...
> Task :reactive-programming-using-reactor:generateLombokConfig UP-TO-DATE
> Task :reactive-programming-using-reactor:compileJava
> Task :reactive-programming-using-reactor:processResources NO-SOURCE
> Task :reactive-programming-using-reactor:classes
> Task :reactive-programming-using-reactor:FluxAndMonoGeneratorService.main()
00:55:23.715 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
BUILD SUCCESSFUL in 1s
这是什么原因? (我的意思是至少我期待的是 onError 但什么都没有......)
注意:mergeWith() 也不适用于此 delayElements()
subscribe
不是阻塞操作,delayElements
将被安排在另一个线程上(默认为 parallel
调度程序)。结果,您的程序在元素发出之前退出。这是一个测试
@Test
void mergeWithDelayElements() {
Flux<String> abcFlux = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(151));
Flux<String> defFlux = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(100));
StepVerifier.create(Flux.merge(abcFlux, defFlux))
.expectNext("D", "A", "E", "B", "F", "C")
.verifyComplete();
}
我正在学习教程,我相信我的代码与讲师的代码相同,但我不明白为什么 delayElements() 不起作用。
调用方法如下:
public static void main(String[] args) {
FluxAndMonoGeneratorService fluxAndMonoGeneratorService = new FluxAndMonoGeneratorService();
fluxAndMonoGeneratorService.explore_merge()
.doOnComplete(() -> System.out.println("Completed !"))
.onErrorReturn("asdasd")
.subscribe(System.out::println);
}
如果我把没有延迟元素的方法写成:
public Flux<String> explore_merge() {
Flux<String> abcFlux = Flux.just("A", "B", "C");
Flux<String> defFlux = Flux.just("D", "E", "F");
return Flux.merge(abcFlux, defFlux);
}
然后控制台中的输出是(如预期的):
00:53:19.443 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
A
B
C
D
E
F
Completed !
BUILD SUCCESSFUL in 1s
但我想使用 delayElements() 来测试 merge() 方法:
public Flux<String> explore_merge() {
Flux<String> abcFlux = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(151));
Flux<String> defFlux = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(100));
return Flux.merge(abcFlux, defFlux);
}
没有任何反应,onComplete 和 onErrorReturn 都没有,输出什么也没有:
0:55:22: Executing ':reactive-programming-using-reactor:FluxAndMonoGeneratorService.main()'...
> Task :reactive-programming-using-reactor:generateLombokConfig UP-TO-DATE
> Task :reactive-programming-using-reactor:compileJava
> Task :reactive-programming-using-reactor:processResources NO-SOURCE
> Task :reactive-programming-using-reactor:classes
> Task :reactive-programming-using-reactor:FluxAndMonoGeneratorService.main()
00:55:23.715 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
BUILD SUCCESSFUL in 1s
这是什么原因? (我的意思是至少我期待的是 onError 但什么都没有......)
注意:mergeWith() 也不适用于此 delayElements()
subscribe
不是阻塞操作,delayElements
将被安排在另一个线程上(默认为 parallel
调度程序)。结果,您的程序在元素发出之前退出。这是一个测试
@Test
void mergeWithDelayElements() {
Flux<String> abcFlux = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(151));
Flux<String> defFlux = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(100));
StepVerifier.create(Flux.merge(abcFlux, defFlux))
.expectNext("D", "A", "E", "B", "F", "C")
.verifyComplete();
}