Spring Reactor Webflux 调度程序并行度
Spring Reactor Webflux Scheduler Parallelism
对于完全非阻塞的端到端响应式调用,是否建议显式调用 publishOn 或 subscribeOn 来切换调度程序?对于 cpu 消耗性任务或非消耗性任务,始终使用并行通量来优化性能是否有利?
For a fully non-blocking end to end reactive calls, is it recommended to explicitly call publishOn or subscribeOn to switch schedulers?
publishOn
用于向下游发布数据,而subscribeOn
用于从上游消费数据。所以这真的取决于你想从事什么样的工作。
For either cpu consuming or non consuming tasks, is it favorable to always use parallel flux to optimize performance?
绝对不是,考虑这个例子:
Flux.range(1, 10)
.parallel(4)
.runOn(Schedulers.parallel())
.sequential()
.elapsed()
.subscribe(i -> System.out.printf(" %s ", i));
上面的代码完全是浪费,因为 i
几乎会立即处理。以下代码将比上面的代码执行得更好:
Flux.range(1, 10)
.elapsed()
.subscribe(i -> System.out.printf(" %s ", i));
现在考虑一下:
public static <T> T someMethodThatBlocks(T i, int ms) {
try { Thread.sleep( ms ); }
catch (InterruptedException e) {}
return i;
}
// some method here
Flux.range(1, 10)
.parallel(4)
.runOn(Schedulers.parallel())
.map(i -> someMethodThatBlocks(i, 200))
.sequential()
.elapsed()
.subscribe(i -> System.out.printf(" %s ", i));
输出类似于:
[210,3] [5,1] [0,2] [0,4] [196,6] [0,8] [0,5] [4,7] [196,10] [0,9]
如您所见,第一个响应在 210
毫秒后出现,随后是 3 个响应,两者之间的时间接近 0
。这个循环一次又一次地重复。这是您应该使用平行通量的地方。请注意,创建更多线程并不能保证性能,因为当有更多线程时,上下文切换会增加很多开销,因此应该在部署之前对代码进行测试。如果有很多阻塞调用,每个 cpu 有超过 1 个线程可能会给你带来性能提升,但如果调用是 cpu 密集的,那么每个 cpu 有多个线程] 会因上下文切换而降低性能。
总而言之,这始终取决于您想要实现的目标。
值得一提的是,我假设这里的上下文是 Webflux 而不是一般的反应器(因为这个问题被标记为这样。)如果我们谈论的是通用的反应器用例,建议当然会有很大差异不考虑 Webflux。
For a fully non-blocking end to end reactive calls, is it recommended to explicitly call publishOn or subscribeOn to switch schedulers?
一般建议是不要显式调用这些方法,除非您有理由这样做。 (在正确的上下文中使用它们没有错,但这样做 "just because" 可能不会带来任何好处。)
For either cpu consuming or non consuming tasks, is it favorable to always use parallel flux to optimize performance?
这取决于您要实现的目标,以及您所说的 "CPU consuming"(或 CPU 密集型)任务的含义。请注意,我在这里谈论的是 genuinelyCPU 密集任务而不是阻塞代码——在这种情况下,我最好将 CPU 密集部分分出来到另一个微服务,使您能够根据需要独立于您的 Webflux 服务进行扩展。
使用并行通量(并且 运行 它在并行调度程序上)应该使用所有可用的核心来处理数据 - 这很可能会导致它被更快地处理。但请记住,默认情况下,每个核心都有一个事件循环 运行,因此您实际上已经 "stolen" 事件循环中的一些可用容量来实现这一点。这是否理想取决于您的用例,但通常不会带来太多好处。
相反,我推荐两种方法:
- 如果您可以将 CPU 密集型任务分解为小的、低强度的块,那么就这样做 - 然后您可以将其保留在事件循环中。这允许事件循环保持 运行 及时,同时这些 CPU 密集型任务被安排得很好。
- 如果你不能分解它,启动一个单独的调度程序(可选择低优先级,这样它就不太可能从事件循环中窃取资源),然后将你所有的 CPU 密集型任务集中起来到那个。这有创建更多线程的缺点,但再次保持事件循环空闲。默认情况下,您将拥有与事件循环的内核一样多的线程 - 您可能希望减少它以便为您的 "CPU intensive" 调度程序提供更多内核。
对于完全非阻塞的端到端响应式调用,是否建议显式调用 publishOn 或 subscribeOn 来切换调度程序?对于 cpu 消耗性任务或非消耗性任务,始终使用并行通量来优化性能是否有利?
For a fully non-blocking end to end reactive calls, is it recommended to explicitly call publishOn or subscribeOn to switch schedulers?
publishOn
用于向下游发布数据,而subscribeOn
用于从上游消费数据。所以这真的取决于你想从事什么样的工作。
For either cpu consuming or non consuming tasks, is it favorable to always use parallel flux to optimize performance?
绝对不是,考虑这个例子:
Flux.range(1, 10)
.parallel(4)
.runOn(Schedulers.parallel())
.sequential()
.elapsed()
.subscribe(i -> System.out.printf(" %s ", i));
上面的代码完全是浪费,因为 i
几乎会立即处理。以下代码将比上面的代码执行得更好:
Flux.range(1, 10)
.elapsed()
.subscribe(i -> System.out.printf(" %s ", i));
现在考虑一下:
public static <T> T someMethodThatBlocks(T i, int ms) {
try { Thread.sleep( ms ); }
catch (InterruptedException e) {}
return i;
}
// some method here
Flux.range(1, 10)
.parallel(4)
.runOn(Schedulers.parallel())
.map(i -> someMethodThatBlocks(i, 200))
.sequential()
.elapsed()
.subscribe(i -> System.out.printf(" %s ", i));
输出类似于:
[210,3] [5,1] [0,2] [0,4] [196,6] [0,8] [0,5] [4,7] [196,10] [0,9]
如您所见,第一个响应在 210
毫秒后出现,随后是 3 个响应,两者之间的时间接近 0
。这个循环一次又一次地重复。这是您应该使用平行通量的地方。请注意,创建更多线程并不能保证性能,因为当有更多线程时,上下文切换会增加很多开销,因此应该在部署之前对代码进行测试。如果有很多阻塞调用,每个 cpu 有超过 1 个线程可能会给你带来性能提升,但如果调用是 cpu 密集的,那么每个 cpu 有多个线程] 会因上下文切换而降低性能。
总而言之,这始终取决于您想要实现的目标。
值得一提的是,我假设这里的上下文是 Webflux 而不是一般的反应器(因为这个问题被标记为这样。)如果我们谈论的是通用的反应器用例,建议当然会有很大差异不考虑 Webflux。
For a fully non-blocking end to end reactive calls, is it recommended to explicitly call publishOn or subscribeOn to switch schedulers?
一般建议是不要显式调用这些方法,除非您有理由这样做。 (在正确的上下文中使用它们没有错,但这样做 "just because" 可能不会带来任何好处。)
For either cpu consuming or non consuming tasks, is it favorable to always use parallel flux to optimize performance?
这取决于您要实现的目标,以及您所说的 "CPU consuming"(或 CPU 密集型)任务的含义。请注意,我在这里谈论的是 genuinelyCPU 密集任务而不是阻塞代码——在这种情况下,我最好将 CPU 密集部分分出来到另一个微服务,使您能够根据需要独立于您的 Webflux 服务进行扩展。
使用并行通量(并且 运行 它在并行调度程序上)应该使用所有可用的核心来处理数据 - 这很可能会导致它被更快地处理。但请记住,默认情况下,每个核心都有一个事件循环 运行,因此您实际上已经 "stolen" 事件循环中的一些可用容量来实现这一点。这是否理想取决于您的用例,但通常不会带来太多好处。
相反,我推荐两种方法:
- 如果您可以将 CPU 密集型任务分解为小的、低强度的块,那么就这样做 - 然后您可以将其保留在事件循环中。这允许事件循环保持 运行 及时,同时这些 CPU 密集型任务被安排得很好。
- 如果你不能分解它,启动一个单独的调度程序(可选择低优先级,这样它就不太可能从事件循环中窃取资源),然后将你所有的 CPU 密集型任务集中起来到那个。这有创建更多线程的缺点,但再次保持事件循环空闲。默认情况下,您将拥有与事件循环的内核一样多的线程 - 您可能希望减少它以便为您的 "CPU intensive" 调度程序提供更多内核。