actor 在发送 Single 时得到 BufferOverflowException
actor getting BufferOverflowException when sending Single
我正在尝试从 akka actor 发送数百个 http 请求,但是我收到了
akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool (HostConnectionPoolSetup(places.api.here.com,443,ConnectionPoolSetup(ConnectionPoolSettings(16,1,5,16,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.0)...
这个application.conf
http {
host-connection-pool {
max-connections = 16
min-connections = 1
max-open-requests = 16
}
}
这是代码
override def receive: Receive = {
case Foo(_) =>
val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
// do something for the result
我试图通过状态来控制,例如
override def receive: Receive = run(0)
def run(openRequests: Int) : Receive = {
case Foo(_) if openRequests <= 16 =>
context.become(run(openRequests + 1))
val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
responseFuture.foreach(context.become(run(openRequests - 1)))
//...
无论哪种方式,我都得到了 BufferOverflowException
的相同异常
任何建议将不胜感激
在 Future
中异步使用 context
是个坏主意。 context
仅在调用 actor 期间有效。
错误在于 context.become(run(openRequests - 1))
在创建 Future
时使用 openRequests
的值,而不是调用它时的值。因此,当第一个请求完成时,它会调用 context.become(run(-1))
(这显然是虚假的),即使可能有 15 个未完成的请求。
解决方法是在foreach
中给自己发私信,而不是直接调用context.become
。当 actor 处理该消息时,它会减少 current 请求计数,并在必要时发送新请求。
我正在尝试从 akka actor 发送数百个 http 请求,但是我收到了
akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool (HostConnectionPoolSetup(places.api.here.com,443,ConnectionPoolSetup(ConnectionPoolSettings(16,1,5,16,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.0)...
这个application.conf
http {
host-connection-pool {
max-connections = 16
min-connections = 1
max-open-requests = 16
}
}
这是代码
override def receive: Receive = {
case Foo(_) =>
val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
// do something for the result
我试图通过状态来控制,例如
override def receive: Receive = run(0)
def run(openRequests: Int) : Receive = {
case Foo(_) if openRequests <= 16 =>
context.become(run(openRequests + 1))
val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
responseFuture.foreach(context.become(run(openRequests - 1)))
//...
无论哪种方式,我都得到了 BufferOverflowException
任何建议将不胜感激
在 Future
中异步使用 context
是个坏主意。 context
仅在调用 actor 期间有效。
错误在于 context.become(run(openRequests - 1))
在创建 Future
时使用 openRequests
的值,而不是调用它时的值。因此,当第一个请求完成时,它会调用 context.become(run(-1))
(这显然是虚假的),即使可能有 15 个未完成的请求。
解决方法是在foreach
中给自己发私信,而不是直接调用context.become
。当 actor 处理该消息时,它会减少 current 请求计数,并在必要时发送新请求。