Project Reactor - 订阅并行调度程序不起作用

Project Reactor - subscribe on parallel scheduler doesn't work

我正在查看示例和阅读文档,但在尝试以并行方式订阅 Flux 时发现了一些问题。

我有3个功能,如下。

private val log = LoggerFactory.getLogger("main")
private val sequence = Flux.just(1, 2)

fun a() {
    sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
    sequence.subscribe { log.info(">>> {}", it) }
}

fun b() {
    sequence.subscribe { log.info(">>> {}", it) }
}

fun c() {
    sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
}

现在,当我分别 运行 每个方法时,函数 a()b() 都有正确的输出,但 c() 的输出为空。这是意料之中的,是设计使然吗?如果是这样,为什么会这样?

Flux.just(...) 捕获值,因此优化为在订阅 Thread.

中立即执行

当您使用 subscribeOn 时,您将订阅 Threadmain 更改为其他内容,使 just 真正异步。

a() 中,如果没有 subscribeOn,第二个 just 将阻塞主线程,足以使测试在异步替代方法完成之前无法完成。

c()中,没有main线程的这种阻塞。因此,测试在异步 just 有时间发出任何内容之前终止,这就是为什么您看不到输出的原因。

为了让它更明显,添加一个 Thread.sleep(10),你会看到一些输出。