如何将 delayElements() 与 Flux 合并一起使用?

How to use delayElements() with Flux merge?

我正在学习教程,我相信我的代码与讲师的代码相同,但我不明白为什么 delayElements() 不起作用。

调用方法如下:

public static void main(String[] args) {
    FluxAndMonoGeneratorService fluxAndMonoGeneratorService = new FluxAndMonoGeneratorService();
    fluxAndMonoGeneratorService.explore_merge()
            .doOnComplete(() -> System.out.println("Completed !"))
            .onErrorReturn("asdasd")
            .subscribe(System.out::println);
}

如果我把没有延迟元素的方法写成:

public Flux<String> explore_merge() {

        Flux<String> abcFlux = Flux.just("A", "B", "C");
        Flux<String> defFlux = Flux.just("D", "E", "F");

        return Flux.merge(abcFlux, defFlux);
    }

然后控制台中的输出是(如预期的):

00:53:19.443 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
A
B
C
D
E
F
Completed !

BUILD SUCCESSFUL in 1s

但我想使用 delayElements() 来测试 merge() 方法:

public Flux<String> explore_merge() {

        Flux<String> abcFlux = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(151));
        Flux<String> defFlux = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(100));

        return Flux.merge(abcFlux, defFlux);
    }

没有任何反应,onComplete 和 onErrorReturn 都没有,输出什么也没有:

0:55:22: Executing ':reactive-programming-using-reactor:FluxAndMonoGeneratorService.main()'...

> Task :reactive-programming-using-reactor:generateLombokConfig UP-TO-DATE
> Task :reactive-programming-using-reactor:compileJava
> Task :reactive-programming-using-reactor:processResources NO-SOURCE
> Task :reactive-programming-using-reactor:classes

> Task :reactive-programming-using-reactor:FluxAndMonoGeneratorService.main()
00:55:23.715 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework

BUILD SUCCESSFUL in 1s

这是什么原因? (我的意思是至少我期待的是 onError 但什么都没有......)

注意:mergeWith() 也不适用于此 delayElements()

subscribe 不是阻塞操作,delayElements 将被安排在另一个线程上(默认为 parallel 调度程序)。结果,您的程序在元素发出之前退出。这是一个测试

@Test
void mergeWithDelayElements() {
    Flux<String> abcFlux = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(151));
    Flux<String> defFlux = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(100));

    StepVerifier.create(Flux.merge(abcFlux, defFlux))
            .expectNext("D", "A", "E", "B", "F", "C")
            .verifyComplete();
}