http4s - 如何创建线程数有限的 blaze 客户端?

http4s - how create blaze client with limited count of threads?

我正在尝试创建线程数有限的 blaze 客户端,如下所示:

object ReactiveCats extends IOApp {
  private val PORT = 8083
  private val DELAY_SERVICE_URL = "http://localhost:8080"
  
  // trying create client with limited number of threads
  val clientPool: ExecutorService = Executors.newFixedThreadPool(64)
  val clientExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(clientPool)

  private val httpClient = BlazeClientBuilder[IO](clientExecutor).resource

  private val httpApp = HttpRoutes.of[IO] {
    case GET -> Root / delayMillis =>
      httpClient.use { client =>
        client
          .expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
          .flatMap(response => Ok(s"ReactiveCats: $response"))
      }
  }.orNotFound

  // trying to create server on fixed thread pool
  val serverPool: ExecutorService = Executors.newFixedThreadPool(64)
  val serverExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(serverPool)

  // start server
  override def run(args: List[String]): IO[ExitCode] =
    BlazeServerBuilder[IO](serverExecutor)
      .bindHttp(port = PORT, host = "localhost")
      .withHttpApp(httpApp)
      .serve
      .compile
      .drain
      .as(ExitCode.Success)
}

full code and load-tests

但是负载测试结果看起来像是一个线程一个请求:

如何限制我的 blaze 客户端的线程数?

您的代码有两个明显的错误:

  1. 您正在创建一个 Executor,但在完成后没有将其关闭。
  2. 您正在 HTTP 路由中使用 httpClient 资源的 use 方法,这意味着每次调用路由时,它都会创建、使用和销毁 http 客户端。您应该在启动期间创建一次。

与任何其他资源(例如文件句柄等)一样,执行程序应始终使用 Resource.make 分配,如下所示:

  val clientPool: Resource[IO, ExecutorService] = Resource.make(IO(Executors.newFixedThreadPool(64)))(ex => IO(ex.shutdown()))
  val clientExecutor: Resource[IO, ExecutionContextExecutor] = clientPool.map(ExecutionContext.fromExecutor)

  private val httpClient = clientExecutor.flatMap(ex => BlazeClientBuilder[IO](ex).resource)

第二个问题可以通过在构建 HTTP 应用程序之前分配 httpClient 轻松解决:

  private def httpApp(client: Client[IO]): Kleisli[IO, Request[IO], Response[IO]] = HttpRoutes.of[IO] {
    case GET -> Root / delayMillis =>
      client
        .expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
        .flatMap(response => Ok(s"ReactiveCats: $response"))
  }.orNotFound

…

  override def run(args: List[String]): IO[ExitCode] =
    httpClient.use { client =>
      BlazeServerBuilder[IO](serverExecutor)
        .bindHttp(port = PORT, host = "localhost")
        .withHttpApp(httpApp(client))
        .serve
        .compile
        .drain
        .as(ExitCode.Success)
    }

另一个潜在的问题是您正在使用 IOApp,它带有自己的线程池。解决这个问题的最好方法可能是混合 IOApp.WithContext 特性并实现此方法:

  override protected def executionContextResource: Resource[SyncIO, ExecutionContext] = ???

从我的评论中复制。

已为 Blaze 客户端正确设置性能问题的答案 - 对我来说这是 .withMaxWaitQueueLimit(1024) 参数。