actor 在发送 Single 时得到 BufferOverflowException

actor getting BufferOverflowException when sending Single

我正在尝试从 akka actor 发送数百个 http 请求,但是我收到了

akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool (HostConnectionPoolSetup(places.api.here.com,443,ConnectionPoolSetup(ConnectionPoolSettings(16,1,5,16,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.0)...

这个application.conf

   http {
          host-connection-pool {
            max-connections = 16
            min-connections = 1
            max-open-requests = 16
          }
        }

这是代码

override def receive: Receive = {
      case Foo(_) => 
       val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
   // do something for the result

我试图通过状态来控制,例如

override def receive: Receive = run(0)
def run(openRequests: Int) : Receive = {
  case Foo(_) if openRequests <= 16 => 
     context.become(run(openRequests + 1))
       val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
       responseFuture.foreach(context.become(run(openRequests - 1)))
        //...

无论哪种方式,我都得到了 BufferOverflowException

的相同异常

任何建议将不胜感激

Future 中异步使用 context 是个坏主意。 context 仅在调用 actor 期间有效。

错误在于 context.become(run(openRequests - 1)) 在创建 Future 时使用 openRequests 的值,而不是调用它时的值。因此,当第一个请求完成时,它会调用 context.become(run(-1))(这显然是虚假的),即使可能有 15 个未完成的请求。

解决方法是在foreach中给自己发私信,而不是直接调用context.become。当 actor 处理该消息时,它会减少 current 请求计数,并在必要时发送新请求。