在多个 Flux 结束时同步,比如 subscribe() 和 then()

Synchronize at the end of multiple Fluxes, like subscribe() with then()

我有不同的 Fluxes 做某事,例如将值存储在数据库中或简单地将它们打印出来。所有都是有限的,我想在所有 Fluxes 完全消耗后执行一个操作:

public void run(String... args) {

    Flux<String> firstFlux = Flux.just("a", "b", "c");
    Flux<Integer> secondFlux = Flux.just(1, 2, 3);
    Flux<Object> thirdFlux = Flux.just(1, "2", 3);

    firstFlux
            .doOnComplete(() -> log.info("first Flux has completed."))
            .subscribe(s -> insertIntoDbString(s));

    secondFlux
            .doOnComplete(() -> log.info("second Flux has completed."))
            .subscribe(i -> insertIntoDbInteger(i));

    thirdFlux
            .doOnComplete(() -> log.info("third Flux has completed."))
            .subscribe(o -> insertIntoDbObject(o));

    // do something after all Fluxes have completed:
    // log.info("all Fluxes have completed.");

}

private void insertIntoDbObject(Object s) {
    log.info("inserting Object: {}", s);
}

private void insertIntoDbInteger(Integer s) {
    log.info("inserting Integer: {}", s);
}

private void insertIntoDbString(String s) {
    log.info("inserting String: {}", s);
}

我怎样才能做到这一点?我想,我可以为每个 Flux 使用 then() 来接收 Mono<Void>,收集它们并在那里使用 doOnComplete() 但后来我似乎无法订阅到 Flux 了。

非常感谢。

您可以使用 Flux.merge 合并所有通量并用 doOnNext 替换您的订阅方法,例如:

    Flux.merge(
           firstFlux
                .doOnComplete(() -> log.info("first Flux has completed."))
                .doOnNext(s -> insertIntoDbString(s)),
           secondFlux
                .doOnComplete(() -> log.info("second Flux has completed."))
                .doOnNext(s -> insertIntoDbInteger(s)),
           thirdFlux
                .doOnComplete(() -> log.info("third Flux has completed."))
                .doOnNext(s -> insertIntoDbObject(s))
    )
    .doOnComplete(() -> log.info("All flux has completed."))

如果您不关心传递每个通量发出的转发值,只关心完成它们中的每一个,最合适的运算符是Mono.when。它清楚地表明,您不需要发出的数据,只需要通量已完成的信息。

Mono.when(
    firstFlux
            .doOnComplete(() -> log.info("first Flux has completed."))
            .doOnNext(s -> insertIntoDbString(s)),

    secondFlux
            .doOnComplete(() -> log.info("second Flux has completed."))
            .doOnNext(i -> insertIntoDbInteger(i)),

    thirdFlux
            .doOnComplete(() -> log.info("third Flux has completed."))
            .doOnNext(o -> insertIntoDbObject(o))
).doOnSuccess(aVoid -> log.info("all Fluxes have completed."));