http4s - 如何创建线程数有限的 blaze 客户端?
http4s - how create blaze client with limited count of threads?
我正在尝试创建线程数有限的 blaze 客户端,如下所示:
object ReactiveCats extends IOApp {
private val PORT = 8083
private val DELAY_SERVICE_URL = "http://localhost:8080"
// trying create client with limited number of threads
val clientPool: ExecutorService = Executors.newFixedThreadPool(64)
val clientExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(clientPool)
private val httpClient = BlazeClientBuilder[IO](clientExecutor).resource
private val httpApp = HttpRoutes.of[IO] {
case GET -> Root / delayMillis =>
httpClient.use { client =>
client
.expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
.flatMap(response => Ok(s"ReactiveCats: $response"))
}
}.orNotFound
// trying to create server on fixed thread pool
val serverPool: ExecutorService = Executors.newFixedThreadPool(64)
val serverExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(serverPool)
// start server
override def run(args: List[String]): IO[ExitCode] =
BlazeServerBuilder[IO](serverExecutor)
.bindHttp(port = PORT, host = "localhost")
.withHttpApp(httpApp)
.serve
.compile
.drain
.as(ExitCode.Success)
}
但是负载测试结果看起来像是一个线程一个请求:
如何限制我的 blaze 客户端的线程数?
您的代码有两个明显的错误:
- 您正在创建一个 Executor,但在完成后没有将其关闭。
- 您正在 HTTP 路由中使用
httpClient
资源的 use
方法,这意味着每次调用路由时,它都会创建、使用和销毁 http 客户端。您应该在启动期间创建一次。
与任何其他资源(例如文件句柄等)一样,执行程序应始终使用 Resource.make
分配,如下所示:
val clientPool: Resource[IO, ExecutorService] = Resource.make(IO(Executors.newFixedThreadPool(64)))(ex => IO(ex.shutdown()))
val clientExecutor: Resource[IO, ExecutionContextExecutor] = clientPool.map(ExecutionContext.fromExecutor)
private val httpClient = clientExecutor.flatMap(ex => BlazeClientBuilder[IO](ex).resource)
第二个问题可以通过在构建 HTTP 应用程序之前分配 httpClient 轻松解决:
private def httpApp(client: Client[IO]): Kleisli[IO, Request[IO], Response[IO]] = HttpRoutes.of[IO] {
case GET -> Root / delayMillis =>
client
.expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
.flatMap(response => Ok(s"ReactiveCats: $response"))
}.orNotFound
…
override def run(args: List[String]): IO[ExitCode] =
httpClient.use { client =>
BlazeServerBuilder[IO](serverExecutor)
.bindHttp(port = PORT, host = "localhost")
.withHttpApp(httpApp(client))
.serve
.compile
.drain
.as(ExitCode.Success)
}
另一个潜在的问题是您正在使用 IOApp
,它带有自己的线程池。解决这个问题的最好方法可能是混合 IOApp.WithContext
特性并实现此方法:
override protected def executionContextResource: Resource[SyncIO, ExecutionContext] = ???
从我的评论中复制。
已为 Blaze 客户端正确设置性能问题的答案 - 对我来说这是 .withMaxWaitQueueLimit(1024) 参数。
我正在尝试创建线程数有限的 blaze 客户端,如下所示:
object ReactiveCats extends IOApp {
private val PORT = 8083
private val DELAY_SERVICE_URL = "http://localhost:8080"
// trying create client with limited number of threads
val clientPool: ExecutorService = Executors.newFixedThreadPool(64)
val clientExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(clientPool)
private val httpClient = BlazeClientBuilder[IO](clientExecutor).resource
private val httpApp = HttpRoutes.of[IO] {
case GET -> Root / delayMillis =>
httpClient.use { client =>
client
.expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
.flatMap(response => Ok(s"ReactiveCats: $response"))
}
}.orNotFound
// trying to create server on fixed thread pool
val serverPool: ExecutorService = Executors.newFixedThreadPool(64)
val serverExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(serverPool)
// start server
override def run(args: List[String]): IO[ExitCode] =
BlazeServerBuilder[IO](serverExecutor)
.bindHttp(port = PORT, host = "localhost")
.withHttpApp(httpApp)
.serve
.compile
.drain
.as(ExitCode.Success)
}
但是负载测试结果看起来像是一个线程一个请求:
如何限制我的 blaze 客户端的线程数?
您的代码有两个明显的错误:
- 您正在创建一个 Executor,但在完成后没有将其关闭。
- 您正在 HTTP 路由中使用
httpClient
资源的use
方法,这意味着每次调用路由时,它都会创建、使用和销毁 http 客户端。您应该在启动期间创建一次。
与任何其他资源(例如文件句柄等)一样,执行程序应始终使用 Resource.make
分配,如下所示:
val clientPool: Resource[IO, ExecutorService] = Resource.make(IO(Executors.newFixedThreadPool(64)))(ex => IO(ex.shutdown()))
val clientExecutor: Resource[IO, ExecutionContextExecutor] = clientPool.map(ExecutionContext.fromExecutor)
private val httpClient = clientExecutor.flatMap(ex => BlazeClientBuilder[IO](ex).resource)
第二个问题可以通过在构建 HTTP 应用程序之前分配 httpClient 轻松解决:
private def httpApp(client: Client[IO]): Kleisli[IO, Request[IO], Response[IO]] = HttpRoutes.of[IO] {
case GET -> Root / delayMillis =>
client
.expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
.flatMap(response => Ok(s"ReactiveCats: $response"))
}.orNotFound
…
override def run(args: List[String]): IO[ExitCode] =
httpClient.use { client =>
BlazeServerBuilder[IO](serverExecutor)
.bindHttp(port = PORT, host = "localhost")
.withHttpApp(httpApp(client))
.serve
.compile
.drain
.as(ExitCode.Success)
}
另一个潜在的问题是您正在使用 IOApp
,它带有自己的线程池。解决这个问题的最好方法可能是混合 IOApp.WithContext
特性并实现此方法:
override protected def executionContextResource: Resource[SyncIO, ExecutionContext] = ???
从我的评论中复制。
已为 Blaze 客户端正确设置性能问题的答案 - 对我来说这是 .withMaxWaitQueueLimit(1024) 参数。