Alpakka S3 连接器流无法处理负载,抛出 akka.stream.BufferOverflowException
Alpakka S3 connector stream won't handle the load, throwing akka.stream.BufferOverflowException
我有一个 akka-http 服务,我正在尝试 alpakka s3 connector 上传文件。以前我使用的是临时文件,然后使用 Amazon SDK 上传。这种方法需要对 Amazon SDK 进行一些调整,使其更像 scala,但它甚至可以同时处理 1000 个请求。吞吐量并不惊人,但所有请求最终都通过了。这是更改前的代码,没有 alpakka:
```
path("uploadfile") {
withRequestTimeout(20.seconds) {
storeUploadedFile("csv", tempDestination) {
case (metadata, file) =>
val uploadFuture = upload(file, file.toPath.getFileName.toString)
onComplete(uploadFuture) {
case Success(_) => complete(StatusCodes.OK)
case Failure(_) => complete(StatusCodes.FailedDependency)
}
}
}
}
}
case class S3UploaderException(msg: String) extends Exception(msg)
def upload(file: File, key: String): Future[String] = {
val s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain())
.withRegion(Regions.EU_WEST_3)
.build()
val promise = Promise[String]()
val listener = new ProgressListener() {
override def progressChanged(progressEvent: ProgressEvent): Unit = {
(progressEvent.getEventType: @unchecked) match {
case ProgressEventType.TRANSFER_FAILED_EVENT => promise.failure(S3UploaderException(s"Uploading a file with a key: $key"))
case ProgressEventType.TRANSFER_COMPLETED_EVENT |
ProgressEventType.TRANSFER_CANCELED_EVENT => promise.success(key)
}
}
}
val request = new PutObjectRequest("S3_BUCKET", key, file)
request.setGeneralProgressListener(listener)
s3Client.putObject(request)
promise.future
}
```
当我将其更改为使用 alpakka 连接器时,代码看起来更好,因为我们可以将 ByteSource
和 alpakka Sink
连接在一起。然而,这种方法无法处理如此大的负载。当我一次执行 1000 个请求(10 kb 文件)时,只有不到 10% 的请求通过,其余请求失败并出现异常:
akka.stream.alpakka.s3.impl.FailedUpload: Exceeded configured
max-open-requests value of [32]. This means that the request queue of
this pool
(HostConnectionPoolSetup(bargain-test.s3-eu-west-3.amazonaws.com,443,ConnectionPoolSetup(ConnectionPoolSettings(4,0,5,32,1,30
seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.1.3),10
seconds,1
minute,512,None,WebSocketSettings(,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda87/1279590204@4d809f4c),List(),ParserSettings(2048,16,64,64,8192,64,8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,Map(If-Range
-> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, Content-MD5 -> 0, Date -> 0, If-Match -> 0, If-None-Match -> 0,
User-Agent ->
32),false,true,akka.util.ConstantFun$$$Lambda34/1539966798@69c23cd4,akka.util.ConstantFun$$$Lambda34/1539966798@69c23cd4,akka.util.ConstantFun$$$Lambda35/297570074@6b426c59),None,TCPTransport),New,1
second),akka.http.scaladsl.HttpsConnectionContext@7e0f3726,akka.event.MarkerLoggingAdapter@74f3a78b)))
has completely filled up because the pool currently does not process
requests fast enough to handle the incoming request load. Please retry
the request later. See
http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html
for more information.
Gatling 测试的摘要如下所示:
---- Response Time Distribution ----------------------------------------
t < 800 ms 0 ( 0%)
800 ms < t < 1200 ms 0 ( 0%)
t > 1200 ms 90 ( 9%)
failed 910 ( 91%)
当我同时执行 100 个请求时,有一半失败了。所以,还是比较满意的。
这是一个新代码:
```
path("uploadfile") {
withRequestTimeout(20.seconds) {
extractRequestContext { ctx =>
implicit val materializer = ctx.materializer
extractActorSystem { actorSystem =>
fileUpload("csv") {
case (metadata, byteSource) =>
val uploadFuture = byteSource.runWith(S3Uploader.sink("s3FileKey")(actorSystem, materializer))
onComplete(uploadFuture) {
case Success(_) => complete(StatusCodes.OK)
case Failure(_) => complete(StatusCodes.FailedDependency)
}
}
}
}
}
}
def sink(s3Key: String)(implicit as: ActorSystem, m: Materializer) = {
val regionProvider = new AwsRegionProvider {
def getRegion: String = Regions.EU_WEST_3.getName
}
val settings = new S3Settings(MemoryBufferType, None, new DefaultAWSCredentialsProviderChain(), regionProvider, false, None, ListBucketVersion2)
val s3Client = new S3Client(settings)(as, m)
s3Client.multipartUpload("S3_BUCKET", s3Key)
}
```
可以看到带有两个端点的完整代码here
我有几个问题。
1) 这是一项功能吗?这就是我们所说的背压吗?
2) 如果我希望此代码的行为类似于使用临时文件的旧方法(没有失败的请求并且所有请求都在某个时刻完成),我必须做什么?我试图为流实现一个队列(link 到下面的源代码),但这没有什么区别。代码可见here.
(* 免责声明 * 我仍然是一个 scala 新手,试图快速了解 akka 流并找到解决问题的方法。这段代码中很可能存在一些简单的错误。* 免责声明 *)
这是一个背压功能。
Exceeded configured max-open-requests value of [32]
在配置中 max-open-requests
默认设置为 32。
流用于处理大量数据,而不是每秒处理很多请求。
Akka 开发人员必须为 max-open-requests
提供一些东西。他们肯定出于某种原因选择 32。他们不知道它将用于什么。它可能一次发送 1000 个 32KB 文件还是 1000 个 1GB 文件?他们不知道。但他们仍然希望确保默认情况下(可能有 80% 的人使用默认设置)应用程序将得到妥善和安全的处理。所以他们不得不限制处理能力。
你要求“现在”做 1000 个,但我很确定 AWS 没有同时发送 1000 个文件,而是使用了一些队列,如果你有很多小文件要上传,这对你来说也是一个好案例。
但是根据你的情况调整它是完全没问题的!
如果您知道您的机器和目标将处理更多同时连接,您可以将数字更改为更高的值。
此外,对于很多 HTTP 调用,请使用 cached host connection pool。
我有一个 akka-http 服务,我正在尝试 alpakka s3 connector 上传文件。以前我使用的是临时文件,然后使用 Amazon SDK 上传。这种方法需要对 Amazon SDK 进行一些调整,使其更像 scala,但它甚至可以同时处理 1000 个请求。吞吐量并不惊人,但所有请求最终都通过了。这是更改前的代码,没有 alpakka:
```
path("uploadfile") {
withRequestTimeout(20.seconds) {
storeUploadedFile("csv", tempDestination) {
case (metadata, file) =>
val uploadFuture = upload(file, file.toPath.getFileName.toString)
onComplete(uploadFuture) {
case Success(_) => complete(StatusCodes.OK)
case Failure(_) => complete(StatusCodes.FailedDependency)
}
}
}
}
}
case class S3UploaderException(msg: String) extends Exception(msg)
def upload(file: File, key: String): Future[String] = {
val s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain())
.withRegion(Regions.EU_WEST_3)
.build()
val promise = Promise[String]()
val listener = new ProgressListener() {
override def progressChanged(progressEvent: ProgressEvent): Unit = {
(progressEvent.getEventType: @unchecked) match {
case ProgressEventType.TRANSFER_FAILED_EVENT => promise.failure(S3UploaderException(s"Uploading a file with a key: $key"))
case ProgressEventType.TRANSFER_COMPLETED_EVENT |
ProgressEventType.TRANSFER_CANCELED_EVENT => promise.success(key)
}
}
}
val request = new PutObjectRequest("S3_BUCKET", key, file)
request.setGeneralProgressListener(listener)
s3Client.putObject(request)
promise.future
}
```
当我将其更改为使用 alpakka 连接器时,代码看起来更好,因为我们可以将 ByteSource
和 alpakka Sink
连接在一起。然而,这种方法无法处理如此大的负载。当我一次执行 1000 个请求(10 kb 文件)时,只有不到 10% 的请求通过,其余请求失败并出现异常:
akka.stream.alpakka.s3.impl.FailedUpload: Exceeded configured max-open-requests value of [32]. This means that the request queue of this pool (HostConnectionPoolSetup(bargain-test.s3-eu-west-3.amazonaws.com,443,ConnectionPoolSetup(ConnectionPoolSettings(4,0,5,32,1,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.1.3),10 seconds,1 minute,512,None,WebSocketSettings(,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda87/1279590204@4d809f4c),List(),ParserSettings(2048,16,64,64,8192,64,8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,Map(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, Content-MD5 -> 0, Date -> 0, If-Match -> 0, If-None-Match -> 0, User-Agent -> 32),false,true,akka.util.ConstantFun$$$Lambda34/1539966798@69c23cd4,akka.util.ConstantFun$$$Lambda34/1539966798@69c23cd4,akka.util.ConstantFun$$$Lambda35/297570074@6b426c59),None,TCPTransport),New,1 second),akka.http.scaladsl.HttpsConnectionContext@7e0f3726,akka.event.MarkerLoggingAdapter@74f3a78b))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
Gatling 测试的摘要如下所示:
---- Response Time Distribution ---------------------------------------- t < 800 ms 0 ( 0%)
800 ms < t < 1200 ms 0 ( 0%)
t > 1200 ms 90 ( 9%)
failed 910 ( 91%)
当我同时执行 100 个请求时,有一半失败了。所以,还是比较满意的。
这是一个新代码: ```
path("uploadfile") {
withRequestTimeout(20.seconds) {
extractRequestContext { ctx =>
implicit val materializer = ctx.materializer
extractActorSystem { actorSystem =>
fileUpload("csv") {
case (metadata, byteSource) =>
val uploadFuture = byteSource.runWith(S3Uploader.sink("s3FileKey")(actorSystem, materializer))
onComplete(uploadFuture) {
case Success(_) => complete(StatusCodes.OK)
case Failure(_) => complete(StatusCodes.FailedDependency)
}
}
}
}
}
}
def sink(s3Key: String)(implicit as: ActorSystem, m: Materializer) = {
val regionProvider = new AwsRegionProvider {
def getRegion: String = Regions.EU_WEST_3.getName
}
val settings = new S3Settings(MemoryBufferType, None, new DefaultAWSCredentialsProviderChain(), regionProvider, false, None, ListBucketVersion2)
val s3Client = new S3Client(settings)(as, m)
s3Client.multipartUpload("S3_BUCKET", s3Key)
}
```
可以看到带有两个端点的完整代码here
我有几个问题。
1) 这是一项功能吗?这就是我们所说的背压吗?
2) 如果我希望此代码的行为类似于使用临时文件的旧方法(没有失败的请求并且所有请求都在某个时刻完成),我必须做什么?我试图为流实现一个队列(link 到下面的源代码),但这没有什么区别。代码可见here.
(* 免责声明 * 我仍然是一个 scala 新手,试图快速了解 akka 流并找到解决问题的方法。这段代码中很可能存在一些简单的错误。* 免责声明 *)
这是一个背压功能。
Exceeded configured max-open-requests value of [32]
在配置中 max-open-requests
默认设置为 32。
流用于处理大量数据,而不是每秒处理很多请求。
Akka 开发人员必须为 max-open-requests
提供一些东西。他们肯定出于某种原因选择 32。他们不知道它将用于什么。它可能一次发送 1000 个 32KB 文件还是 1000 个 1GB 文件?他们不知道。但他们仍然希望确保默认情况下(可能有 80% 的人使用默认设置)应用程序将得到妥善和安全的处理。所以他们不得不限制处理能力。
你要求“现在”做 1000 个,但我很确定 AWS 没有同时发送 1000 个文件,而是使用了一些队列,如果你有很多小文件要上传,这对你来说也是一个好案例。
但是根据你的情况调整它是完全没问题的! 如果您知道您的机器和目标将处理更多同时连接,您可以将数字更改为更高的值。
此外,对于很多 HTTP 调用,请使用 cached host connection pool。