Project Reactor - 订阅并行调度程序不起作用
Project Reactor - subscribe on parallel scheduler doesn't work
我正在查看示例和阅读文档,但在尝试以并行方式订阅 Flux 时发现了一些问题。
我有3个功能,如下。
private val log = LoggerFactory.getLogger("main")
private val sequence = Flux.just(1, 2)
fun a() {
sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
sequence.subscribe { log.info(">>> {}", it) }
}
fun b() {
sequence.subscribe { log.info(">>> {}", it) }
}
fun c() {
sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
}
现在,当我分别 运行 每个方法时,函数 a()
和 b()
都有正确的输出,但 c()
的输出为空。这是意料之中的,是设计使然吗?如果是这样,为什么会这样?
Flux.just(...)
捕获值,因此优化为在订阅 Thread
.
中立即执行
当您使用 subscribeOn
时,您将订阅 Thread
从 main
更改为其他内容,使 just
真正异步。
在 a()
中,如果没有 subscribeOn
,第二个 just
将阻塞主线程,足以使测试在异步替代方法完成之前无法完成。
在c()
中,没有main
线程的这种阻塞。因此,测试在异步 just
有时间发出任何内容之前终止,这就是为什么您看不到输出的原因。
为了让它更明显,添加一个 Thread.sleep(10)
,你会看到一些输出。
我正在查看示例和阅读文档,但在尝试以并行方式订阅 Flux 时发现了一些问题。
我有3个功能,如下。
private val log = LoggerFactory.getLogger("main")
private val sequence = Flux.just(1, 2)
fun a() {
sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
sequence.subscribe { log.info(">>> {}", it) }
}
fun b() {
sequence.subscribe { log.info(">>> {}", it) }
}
fun c() {
sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
}
现在,当我分别 运行 每个方法时,函数 a()
和 b()
都有正确的输出,但 c()
的输出为空。这是意料之中的,是设计使然吗?如果是这样,为什么会这样?
Flux.just(...)
捕获值,因此优化为在订阅 Thread
.
当您使用 subscribeOn
时,您将订阅 Thread
从 main
更改为其他内容,使 just
真正异步。
在 a()
中,如果没有 subscribeOn
,第二个 just
将阻塞主线程,足以使测试在异步替代方法完成之前无法完成。
在c()
中,没有main
线程的这种阻塞。因此,测试在异步 just
有时间发出任何内容之前终止,这就是为什么您看不到输出的原因。
为了让它更明显,添加一个 Thread.sleep(10)
,你会看到一些输出。