猫效应:哪个线程池用于非阻塞 IO?

Cats Effect: which thread pool to use for Non-Blocking IO?

来自本教程https://github.com/slouc/concurrency-in-scala-with-ce#threading 异步操作分为 3 组,需要与 运行 on:

截然不同的线程池

Non-blocking asynchronous operations:

Bounded pool with a very low number of threads (maybe even just one), with a very high priority. These threads will basically just sit idle most of the time and keep polling whether there is a new async IO notification. Time that these threads spend processing a request directly maps into application latency, so it's very important that no other work gets done in this pool apart from receiving notifications and forwarding them to the rest of the application. Bounded pool with a very low number of threads (maybe even just one), with a very high priority. These threads will basically just sit idle most of the time and keep polling whether there is a new async IO notification. Time that these threads spend processing a request directly maps into application latency, so it's very important that no other work gets done in this pool apart from receiving notifications and forwarding them to the rest of the application.

Blocking asynchronous operations:

Unbounded cached pool. Unbounded because blocking operation can (and will) block a thread for some time, and we want to be able to serve other I/O requests in the meantime. Cached because we could run out of memory by creating too many threads, so it’s important to reuse existing threads.

CPU-heavy operations:

Fixed pool in which number of threads equals the number of CPU cores. This is pretty straightforward. Back in the day the "golden rule" was number of threads = number of CPU cores + 1, but "+1" was coming from the fact that one extra thread was always reserved for I/O (as explained above, we now have separate pools for that).

在我的 Cats Effect 应用程序中,我使用基于 Scala Future 的 ReactiveMongo 库来访问 MongoDB,它在与 MongoDB 对话时不会阻塞线程,例如执行非阻塞 IO。

它需要执行上下文。 猫效应提供默认执行上下文 IOApp.executionContext

我的问题是:我应该为非阻塞 io 使用哪个执行上下文?

IOApp.executionContext?

但是,来自 IOApp.executionContext 文档:

Provides a default ExecutionContext for the app.

The default on top of the JVM is lazily constructed as a fixed thread pool based on the number available of available CPUs (see PoolUtils).

似乎这个执行上下文属于我上面列出的第三组 - CPU-heavy operations (Fixed pool in which number of threads equals the number of CPU cores.), 这让我觉得 IOApp.executionContext 不是非阻塞 IO 的理想选择。

我说得对吗,我是否应该为非阻塞 IO 创建一个带有固定线程池(1 或 2 个线程)的单独上下文(因此它将属于我上面列出的第一组 - Non-blocking asynchronous operations: Bounded pool with a very low number of threads (maybe even just one), with a very high priority.) ?

或者 IOApp.executionContext 是为 CPU 绑定和非阻塞 IO 操作设计的吗?

我用来将 Scala Future 转换为 F 并排除执行上下文的函数:

def scalaFutureToF[F[_]: Async, A](
      future: => Future[A]
  )(implicit ec: ExecutionContext): F[A] =
    Async[F].async { cb =>
      future.onComplete {
        case Success(value)     => cb(Right(value))
        case Failure(exception) => cb(Left(exception))
      }
    }

在猫效应 3 中,每个 IOApp 都有一个 Runtime

final class IORuntime private[effect] (
  val compute: ExecutionContext,
  private[effect] val blocking: ExecutionContext,
  val scheduler: Scheduler,
  val shutdown: () => Unit,
  val config: IORuntimeConfig,
  private[effect] val fiberErrorCbs: FiberErrorHashtable = new FiberErrorHashtable(16)
)

您几乎总是希望保留默认值而不是 fiddle 声明您自己的 运行 时间,除非在测试或教育示例中。

在您的 IOApp 中,您可以通过以下方式访问 compute 池:

runtime.compute

如果要执行阻塞操作,那么可以使用blocking构造:

blocking(IO(println("foo!"))) >> IO.unit

这样,您就可以告诉 CE3 运行time 此操作可能会阻塞,因此应分派到专用池。参见 here

CE2呢?好吧,它有类似的机制,但它们非常笨重,而且还包含不少惊喜。例如,阻塞调用是使用 Blocker 安排的,然后必须以某种方式凭空召唤或通过整个应用程序进行线程化,线程池定义是使用笨拙的 ContextShift 完成的。如果您对此有任何选择,我强烈建议您在 migrating to CE3.

上投入一些精力

很好,但是 Reactive Mongo 呢?

ReactiveMongo 使用 Netty(基于 Java NIO API)。而Netty有自己的线程池。这在 Netty 5 中有所改变(参见 here), but ReactiveMongo seems to still be on Netty 4 (see here)。

但是,您询问的 ExecutionContext 是将 执行回调 的线程池。这可以是您的计算池。

让我们看一些代码。首先,您的 t运行slation 方法。我刚刚将 async 更改为 async_ 因为我使用的是 CE3,并且我添加了线程 printline:

def scalaFutureToF[F[_]: Async, A](future: => Future[A])(implicit ec: ExecutionContext): F[A] =
  Async[F].async_ { cb =>
    future.onComplete {
      case Success(value)     => {
        println(s"Inside Callback: [${Thread.currentThread.getName}]")
        cb(Right(value))
      }
      case Failure(exception) => cb(Left(exception))
    }
  }

现在假设我们有两个执行上下文 - 一个来自我们的 IOApp,另一个将代表 ReactiveMongo 用于 运行 Future 的任何内容。这是虚构的 ReactiveMongo 一个:

val reactiveMongoContext: ExecutionContext =
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

另一个就是runtime.compute.

现在让我们这样定义 Future

def myFuture: Future[Unit] = Future {
  println(s"Inside Future: [${Thread.currentThread.getName}]")
}(reactiveMongoContext)

请注意我们是如何通过将 reactiveMongoContext 传递给 ReactiveMongo 来假装这个 Future 运行 的。

最后,让我们运行应用程序:

override def run: IO[Unit] = {
  val myContext: ExecutionContext = runtime.compute
  scalaFutureToF(myFuture)(implicitly[Async[IO]], myContext)
}

这是输出:

Inside Future: [pool-1-thread-1]
Inside Callback: [io-compute-6]

我们提供给 scalaFutureToF 的执行上下文仅仅是 运行 回调 。 Future 本身 运行 在我们的 独立线程池 上代表 ReactiveMongo 的池。实际上,您无法控制此池,因为它来自 ReactiveMongo.

额外信息

顺便说一句,如果您不使用类型 class 层次结构 (F),而是直接使用 IO 值,那么您可以使用这种简化的方法:

def scalaFutureToIo[A](future: => Future[A]): IO[A] =
  IO.fromFuture(IO(future))

看看这个甚至不需要您通过 ExecutionContext - 它只使用您的计算池。或者更具体地说,它使用 Async[IO] 定义为 def executionContext: F[ExecutionContext] 的任何内容,结果证明是计算池。让我们检查一下:

override def run: IO[Unit] = {
  IO.executionContext.map(ec => println(ec == runtime.compute))
}
// prints true

最后但并非最不重要的一点:

如果我们真的有办法指定 ReactiveMongo 的底层 Netty 应该使用哪个线程池,那么是的,在那种情况下我们绝对应该使用一个单独的池。我们永远不应该向其他 运行 次提供我们的 runtime.compute 池。