使用并行集合的 scala http 请求
scala http requests using parallel collections
我正在试验 Scala 并行集合。我正在尝试从本地服务器获取数据,我已经
设置,这是我的代码
val httpRequestInputs = List(inputs).par
def getResponse(data: String, url: String) = {
val request = basicRequest.body(text).post(url)
.headers(Map("content-type" -> "text/plain", "charset" -> "utf-8"))
.response(asString)
implicit val backend
= HttpURLConnectionBackend(options = SttpBackendOptions.connectionTimeout(5.minutes))
request.readTimeout(5.minutes).send().body
}
// program executes from here
httpRequestInputs.foreach { input =>
val result = getResponse(input, url)
result match {
case Right(value) => println(value)
case Left(value) => println("error")
}
当使用小尺寸输入时,没有问题,但是,当我尝试使用大尺寸输入时,
程序抛出SocketException
,我检查了服务器,服务器没有错误,
在我看来,客户端正在提前关闭连接。而且,这些大量投入,
通常需要不到 90 秒的时间来获得响应,当 运行 单独时。
我尝试在 http 请求中扩展连接和读取超时选项,但我仍然得到
异常。
谁能帮我理解,为什么客户端要关闭连接?
对于 http 请求,我使用客户端 com.softwaremill.sttp.client
如果“大输入大小”意味着至少有几千个输入,并且每个输入都连接到同一个远程服务器,那么很可能您正在耗尽 运行 所在的临时端口范围this: 本质上有一个限制(从 OS 到 OS)在一定的时间长度内你可以建立到同一个远程主机和端口的连接数(Windows documentation, but every OS to my knowledge has similar limits) .
您需要捕获异常并重试,或者限制连接尝试,以免耗尽范围。 (在极少数情况下,如果您没有尝试超过限制,可能有一个 OS 配置选项可以让您增加限制)。
使用 Scala 标准库来限制这种情况的一个好方法是使用 Future
:
import scala.concurrent.{ ExecutionContext, Future }
implicit val executionContext = ExecutionContext.fromExecutor(
new java.util.concurrent.ForkJoinPool(1000) // Allow 1000 requests to be executing at once
)
val allRequestsFut =
Future.sequence(
httpRequestInputs.map { input =>
Future { getResponse(input, url) }.map {
_ match {
case Right(value) => println(value)
case Left(err) => println(s"error: $err")
}
}
)
allRequestsFut.foreach { _ =>
println("all requests complete")
}
请注意,许多 OS(包括 Linux)将在端口关闭后继续保留临时端口一段时间。要动态地限制请求,我建议使用类似 Akka Streams 的东西。
我正在试验 Scala 并行集合。我正在尝试从本地服务器获取数据,我已经 设置,这是我的代码
val httpRequestInputs = List(inputs).par
def getResponse(data: String, url: String) = {
val request = basicRequest.body(text).post(url)
.headers(Map("content-type" -> "text/plain", "charset" -> "utf-8"))
.response(asString)
implicit val backend
= HttpURLConnectionBackend(options = SttpBackendOptions.connectionTimeout(5.minutes))
request.readTimeout(5.minutes).send().body
}
// program executes from here
httpRequestInputs.foreach { input =>
val result = getResponse(input, url)
result match {
case Right(value) => println(value)
case Left(value) => println("error")
}
当使用小尺寸输入时,没有问题,但是,当我尝试使用大尺寸输入时,
程序抛出SocketException
,我检查了服务器,服务器没有错误,
在我看来,客户端正在提前关闭连接。而且,这些大量投入,
通常需要不到 90 秒的时间来获得响应,当 运行 单独时。
我尝试在 http 请求中扩展连接和读取超时选项,但我仍然得到 异常。
谁能帮我理解,为什么客户端要关闭连接?
对于 http 请求,我使用客户端 com.softwaremill.sttp.client
如果“大输入大小”意味着至少有几千个输入,并且每个输入都连接到同一个远程服务器,那么很可能您正在耗尽 运行 所在的临时端口范围this: 本质上有一个限制(从 OS 到 OS)在一定的时间长度内你可以建立到同一个远程主机和端口的连接数(Windows documentation, but every OS to my knowledge has similar limits) .
您需要捕获异常并重试,或者限制连接尝试,以免耗尽范围。 (在极少数情况下,如果您没有尝试超过限制,可能有一个 OS 配置选项可以让您增加限制)。
使用 Scala 标准库来限制这种情况的一个好方法是使用 Future
:
import scala.concurrent.{ ExecutionContext, Future }
implicit val executionContext = ExecutionContext.fromExecutor(
new java.util.concurrent.ForkJoinPool(1000) // Allow 1000 requests to be executing at once
)
val allRequestsFut =
Future.sequence(
httpRequestInputs.map { input =>
Future { getResponse(input, url) }.map {
_ match {
case Right(value) => println(value)
case Left(err) => println(s"error: $err")
}
}
)
allRequestsFut.foreach { _ =>
println("all requests complete")
}
请注意,许多 OS(包括 Linux)将在端口关闭后继续保留临时端口一段时间。要动态地限制请求,我建议使用类似 Akka Streams 的东西。