如何急切合并两个Flux?

How to eagerly merge two Flux?

Flux<Long> flux1 = Flux
        .<Long>create(fluxSink -> {
            for (long i = 0; i < 20; i++) {
                fluxSink.next(i);
            }
        })
        .filter(aLong -> aLong % 2 == 0)
        .doOnNext(aLong -> System.out.println("flux 1 : " + aLong));

Flux<Long> flux2 = Flux
        .<Long>create(fluxSink -> {
            for (long i = 0; i < 20; i++) {
                fluxSink.next(i);
            }
        })
        .filter(aLong -> aLong % 2 == 1)
        .doOnNext(aLong -> System.out.println("flux 2 : " + aLong));

Flux.merge(flux1, flux2)
        .doOnNext(System.out::println)
        .then()
        .block();

创建两个Flux<Long>像上面的代码

flux1 创建偶数流 (0,2,4,6,8 ...) flux2 创建奇数流 (1,3,5,7,9 ...)

我预计合并这 2 个 flux1 和 flux2 时会像

0,1,2,3,4 ...0,2,1,3,4..取决于计算能力

但总是花费 flux1 和 flux2 (flux1 start)0,2,4,6,8, ... 16,18,(flux1 end)(flux2 start)1,3,5,7 ... 17,19

如何订阅多个流量热切事件?

两个流 运行 在同一线程上。当您订阅时 flux1 开始推送数据直到完成。只有这样线程才有空 flux2 才能继续。 merge 运算符按照它们到达的顺序发出值。它不会在第一个和第二个流之间切换。

如果您想要流运行并发,您需要运行它们在不同的线程上,例如通过使用 publishOn 运算符。

Flux<Long> flux1 = Flux
    .<Long>create(fluxSink -> {
        for (long i = 0; i < 20; i++) {
            fluxSink.next(i);
        }
    })
    .publishOn(Schedulers.newSingle("thread-x")
    .filter(aLong -> aLong % 2 == 0)
    .doOnNext(aLong -> System.out.println("flux 1 : " + aLong));