等待 ParallelFlux 完成

Waiting for ParallelFlux completion

我创建了一个 ParallelFlux,然后使用了 .sequential(),期望那时我可以计算或 "reduce" 并行计算的结果。问题似乎是并行线程被解雇了,没有什么在等待它们。

我已经使用 CountDownLatch 完成了一些工作,但我认为我不应该这样做。

TL;DR - 我无法打印出此代码的结果:

    Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .subscribe(System.out::println);

在 main 中执行该代码时,这是事物异步性质的一个很好的证明。该操作在弹性调度程序的线程上运行,订阅会触发异步处理,因此它会立即 returns。

有两种方法可以使应用程序的结束与 Flux 的结束同步:

doOnNext中打印并使用blockLast()

block* 方法通常用于 main 和测试中,在这种情况下别无选择,只能切换回阻塞模型(即,否则 test/main 将在处理结束前退出)。

我们切换 "side effect" 在 doOnNext 中打印每个发出的项目,它专用于此类用例。然后我们阻塞直到通量完成,用blockLast().

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doOnNext(System.out::println)
    .blockLast();

subscribe中打印并在doFinally

中使用CountDownLatch

这个有点复杂,但如果您想探索该方法的工作原理,您可以实际使用订阅。

我们只需添加一个 doFinally,它会在任何完成信号(取消、错误或完成)上触发。我们用一个CountDownLatch来阻塞主线程,主线程会从doFinally.

里面异步倒计时
CountDownLatch latch = new CountDownLatch(1);

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doFinally(signal -> latch.countDown())
    .subscribe(System.out::println);

latch.await(10, TimeUnit.SECONDS);