等待 ParallelFlux 完成
Waiting for ParallelFlux completion
我创建了一个 ParallelFlux
,然后使用了 .sequential()
,期望那时我可以计算或 "reduce" 并行计算的结果。问题似乎是并行线程被解雇了,没有什么在等待它们。
我已经使用 CountDownLatch
完成了一些工作,但我认为我不应该这样做。
TL;DR - 我无法打印出此代码的结果:
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.subscribe(System.out::println);
在 main 中执行该代码时,这是事物异步性质的一个很好的证明。该操作在弹性调度程序的线程上运行,订阅会触发异步处理,因此它会立即 returns。
有两种方法可以使应用程序的结束与 Flux
的结束同步:
在doOnNext
中打印并使用blockLast()
block* 方法通常用于 main 和测试中,在这种情况下别无选择,只能切换回阻塞模型(即,否则 test/main 将在处理结束前退出)。
我们切换 "side effect" 在 doOnNext
中打印每个发出的项目,它专用于此类用例。然后我们阻塞直到通量完成,用blockLast()
.
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doOnNext(System.out::println)
.blockLast();
在subscribe
中打印并在doFinally
中使用CountDownLatch
这个有点复杂,但如果您想探索该方法的工作原理,您可以实际使用订阅。
我们只需添加一个 doFinally
,它会在任何完成信号(取消、错误或完成)上触发。我们用一个CountDownLatch
来阻塞主线程,主线程会从doFinally
.
里面异步倒计时
CountDownLatch latch = new CountDownLatch(1);
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doFinally(signal -> latch.countDown())
.subscribe(System.out::println);
latch.await(10, TimeUnit.SECONDS);
我创建了一个 ParallelFlux
,然后使用了 .sequential()
,期望那时我可以计算或 "reduce" 并行计算的结果。问题似乎是并行线程被解雇了,没有什么在等待它们。
我已经使用 CountDownLatch
完成了一些工作,但我认为我不应该这样做。
TL;DR - 我无法打印出此代码的结果:
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.subscribe(System.out::println);
在 main 中执行该代码时,这是事物异步性质的一个很好的证明。该操作在弹性调度程序的线程上运行,订阅会触发异步处理,因此它会立即 returns。
有两种方法可以使应用程序的结束与 Flux
的结束同步:
在doOnNext
中打印并使用blockLast()
block* 方法通常用于 main 和测试中,在这种情况下别无选择,只能切换回阻塞模型(即,否则 test/main 将在处理结束前退出)。
我们切换 "side effect" 在 doOnNext
中打印每个发出的项目,它专用于此类用例。然后我们阻塞直到通量完成,用blockLast()
.
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doOnNext(System.out::println)
.blockLast();
在subscribe
中打印并在doFinally
CountDownLatch
这个有点复杂,但如果您想探索该方法的工作原理,您可以实际使用订阅。
我们只需添加一个 doFinally
,它会在任何完成信号(取消、错误或完成)上触发。我们用一个CountDownLatch
来阻塞主线程,主线程会从doFinally
.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doFinally(signal -> latch.countDown())
.subscribe(System.out::println);
latch.await(10, TimeUnit.SECONDS);