使用并行集合的 scala http 请求

scala http requests using parallel collections

我正在试验 Scala 并行集合。我正在尝试从本地服务器获取数据,我已经 设置,这是我的代码

val httpRequestInputs = List(inputs).par

def getResponse(data: String, url: String) = {
    val request = basicRequest.body(text).post(url)
      .headers(Map("content-type" -> "text/plain", "charset" -> "utf-8"))
      .response(asString)

    implicit val backend
    = HttpURLConnectionBackend(options = SttpBackendOptions.connectionTimeout(5.minutes))
    request.readTimeout(5.minutes).send().body
}



// program executes from here
  httpRequestInputs.foreach { input =>
      val result = getResponse(input, url)
      result match {
          case Right(value) => println(value)
          case Left(value) => println("error")
     }

当使用小尺寸输入时,没有问题,但是,当我尝试使用大尺寸输入时, 程序抛出SocketException,我检查了服务器,服务器没有错误, 在我看来,客户端正在提前关闭连接。而且,这些大量投入, 通常需要不到 90 秒的时间来获得响应,当 运行 单独时。

我尝试在 http 请求中扩展连接和读取超时选项,但我仍然得到 异常。

谁能帮我理解,为什么客户端要关闭连接?

对于 http 请求,我使用客户端 com.softwaremill.sttp.client

如果“大输入大小”意味着至少有几千个输入,并且每个输入都连接到同一个远程服务器,那么很可能您正在耗尽 运行 所在的临时端口范围this: 本质上有一个限制(从 OS 到 OS)在一定的时间长度内你可以建立到同一个远程主机和端口的连接数(Windows documentation, but every OS to my knowledge has similar limits) .

您需要捕获异常并重试,或者限制连接尝试,以免耗尽范围。 (在极少数情况下,如果您没有尝试超过限制,可能有一个 OS 配置选项可以让您增加限制)。

使用 Scala 标准库来限制这种情况的一个好方法是使用 Future:

import scala.concurrent.{ ExecutionContext, Future }

implicit val executionContext = ExecutionContext.fromExecutor(
  new java.util.concurrent.ForkJoinPool(1000) // Allow 1000 requests to be executing at once
)

val allRequestsFut =
  Future.sequence(
    httpRequestInputs.map { input =>
      Future { getResponse(input, url) }.map {
        _ match {
          case Right(value) => println(value)
          case Left(err) => println(s"error: $err")
        }
      }
    )

allRequestsFut.foreach { _ =>
  println("all requests complete")
}

请注意,许多 OS(包括 Linux)将在端口关闭后继续保留临时端口一段时间。要动态地限制请求,我建议使用类似 Akka Streams 的东西。