使用 subscribeOn 和 publishOn 的最佳实践

Best practice when using subscribeOn and publishOn

我正在使用 Project Reactor 编写我正在编写的新服务。我正在使用 Spring 5Netty。我正在与一堆不同的服务和关系数据库进行交互。所有这些服务都有一个阻塞的客户端,JDBC 也在阻塞。所以基本上 none 这些网络调用 return 发布者。

我的问题是在处理所有阻塞时最佳做法应该是什么 API。目前我所做的是将所有阻塞 API 代码包装到 Mono.callable 中并使用 subscribeOn(Scheduler.elastic())。所以基本上所有的阻塞工作都是在弹性线程池上完成的。

问题1.我应该创建一个专用的线程池执行器而不是使用elastic吗?如果是,那为什么?或者我应该为每个不同的服务创建一个专用的弹性池吗?

问题2.当阻塞方法return结果时,是否应该使用publishOn让主线程重新开始处理? 如果是,我怎样才能得到主线程(netty事件循环线程?)参考?

问题 3. 如果我在多个不同的运算符中调用多个阻塞调用(一些在链接时,一些在使用 zip() 调用时)然后使用 subscribe publishsubscribe publish 不会进行大量上下文切换吗?

这些问题中的大部分都与最佳实践相关。

我不太确定在您的特定情况下使用 WebFlux 可能获得的好处,因为您的所有端点服务都被阻塞了。

1: 基本上你应该根据你的具体硬件和目标行为做出决定。来自官方文档,Java Reactor 与并发无关。这意味着您可以随意使用任何并发性或根本不使用它。据我所知,elastic pool 在底层使用普通的 ExecutorService。我想你应该根据你的具体硬件做出决定,太多的上下文切换和线程从性能的角度来看并不好,特别是如果你没有足够的处理器

2:基本没有,看你需要哪种行为。

3:你自己说清楚了,我不明白你为什么要这么叫publish-subscribe.

Most of these questions are related to what should be the best practices.

Reactive Java 仍处于起步阶段,因此很难找到被接受的 "best practices"(在大多数情况下,整个社区尚未形成共识,因此只有一些来自反应堆开发团队和其他一些人的指导。)

话虽这么说,但我认为现在有一些相当广泛接受的观点会有所帮助。首先,您声明:

All of these services have a client that is blocking.

这可能是最简单的一点,就好像您没有与大量其他非阻塞的服务交互一样,使用反应堆没有什么意义根本。例外情况是,如果您有明确的迁移路径来将这些服务重写为非阻塞服务。

JDBC is also blocking

R2DBC 不是,可能值得研究(尽管该项目仍处于早期阶段。)

Shall I create a dedicated thread pool executor rather than using elastic?

没必要。 (答案是 "yes" 的唯一原因是,如果您有非常具体的线程要求,需要该执行程序服务的特定 "tuning",但如果是这种情况,您就不会问这个问题。 )

但是请考虑使用 boundedElastic(),因为这会对调度程序中的活动线程数量设置上限,这可以减少导致背压问题的失控线程数量。此调度程序专门设计用于连接传统的阻塞 IO 库。

shall I create a dedicate elastic pool for each of different service?

除非您有特殊原因,否则不会。如果你想要一个单独的(或不同的)绑定到不同的服务,这在 boundedElastic() 调度程序中会更相关。 (只是创建一个这样你就可以 "give it a relevant name" 恕我直言,这还不够好。)

When blocking method return result, shall I use publishOn so that main thread picks up processing again?

这完全取决于您的用例 - 反应器故意与并发无关,因此这取决于您是否需要在 "main thread" 中发布结果。 (根据我的经验,通常你不关心。)

how can I get main thread reference back?

publishOn(Schedulers.immediate()) 可用于在您调用的当前线程上发布。

If I’m calling multiple blocking calls in multiple different operators (some while chaining, some while using zip() call) then using subscribe publish and again subscribe publish wouldn’t make a lot of context switching?

视情况而定。但是,是的,当然有可能——这又回到了我的第一点。除非您花在非阻塞代码上的时间超过了在线程池中调用阻塞服务所花的时间,否则切换的动机很小(除非您计划在核心到位时将服务逐渐迁移到非阻塞。)

仅仅切换到 reactor 并将其用作前端以阻塞线程池中包装的服务,您肯定不会获得性能增强。