Akka-http:Http.cachedHostConnectionPoolHttps 在一段时间后停止处理非 2XX 响应

Akka-http: Http.cachedHostConnectionPoolHttps stops processing after sometime for non 2XX responses

我正在尝试使用 cachedHostConnectionPoolHttps 发出出站 https 请求,但似乎在非 2XX 响应一段时间后,池流停止发出元素,从而导致整个流停止。这种特定行为的发生时间是相当随机的,但会发生并且可以重现。

这是一个示例测试,它在抛出 Expected OnNext(_), yet no element signaled during 10 seconds 一段时间后停止发出结果

import java.util.UUID

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream._
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import org.scalatest._

import scala.util.Random

class HttpPoolTest extends FlatSpec with Matchers with OptionValues with Inside with Inspectors {

  "HttpPoolTest" should "run without exception" in {
    implicit val httpSystem = ActorSystem("http-out")
    implicit val httpMat = ActorMaterializer()
    implicit val ec = httpSystem.dispatcher

    val helloFlow = Http().cachedHostConnectionPoolHttps[Int]("android.googleapis.com",443)(httpMat)

    val requestHeaders = scala.collection.immutable.Seq[HttpHeader](RawHeader("Authorization", "key=random" ), RawHeader("Content-Type", "application/json;charset=utf-8"))
    val httpRequest = new HttpRequest(HttpMethods.POST, "/gcm/send", requestHeaders,
      """
        |{
        |    "registration_ids": ["rnadomdata"],
        |    "delay_while_idle": true,
        |    "data": {
        |        "message": "Hello World",
        |        "title": "Interstellar",
        |        "id": "1231222i3211232228w2t1xx6wxq",
        |        "notificationType": "Text"
        |    }
        |}
      """.
        stripMargin)

    val (upstream , downstream) = TestSource.probe[(HttpRequest, Int)].via(helloFlow).toMat(TestSink.probe)(Keep.both).run()


    while (true) {
      val randWait = math.max(5000, Random.nextInt(20000)) //atleast 5 seconds to process the http request
      val randRequest = math.max(1, Random.nextInt(5))
      downstream.request(randRequest)

      (1 to randRequest).foreach(i => upstream.sendNext(httpRequest -> UUID.randomUUID().hashCode()))
      println (s"Waiting for $randWait millis to process")
      Thread.sleep(randWait)
      (1 to randRequest) foreach (i => noException should be thrownBy downstream.expectNext())

    }
  }

}

有什么问题吗?它工作了一段时间,所以我怀疑在处理非 2xx 错误时出现了问题。

我试过关闭 akka.http.host-connection-pool.max-retries = 0akka.http.host-connection-pool.idle-timeout = infinite,但没有结果。

您需要消耗实体数据字节以确保 httpFlow 继续 运行。如果您对解释 httpResponse 的正文不感兴趣,只需通读即可。

downstream.expectNext().entity.dataBytes.to(Sink.ignore)