使用 actor 互操作从 s3 流式传输文件时如何处理背压
How to handle backpressure when Streaming file from s3 with actor interop
我正在尝试从 S3 下载一个大文件并将其数据发送给另一个执行 http 请求的参与者,然后保存响应。我想限制该演员发送的请求数量,因此我需要处理背压。
我试过这样做:
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.mapAsync(1){
case b@Buzz(_, _) =>
(persistActor ? b).mapTo[Done]
}.runWith(Sink.head)
问题是我看到它只从文件中读取 30 行作为并行度的限制集。我不确定这是实现我正在寻找的目标的正确方法
如果原因不是我在评论中提到的 Sink.head
的使用,您可以使用 Sink.actorRefWithBackpressure
.
对流进行反压
示例代码:
class PersistActor extends Actor {
override def receive: Receive = {
case "init" =>
println("Initialized")
case "complete" =>
context.stop(self)
case message =>
//Persist Buzz??
sender() ! Done
}
}
val sink = Sink
.actorRefWithBackpressure(persistActor, "init", Done, "complete", PartialFunction.empty)
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
//You could backpressure here too...
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.to(sink)
.run()
正如 Johny 在他的评论中指出的那样,Sink.head
是导致流仅处理大约 30 个元素的原因。发生的情况大约是:
Sink.head
表示对 1 个元素的需求
- 这个需求向上传播到第二个
mapAsync
- 当需求到达第一个
mapAsync
时,因为它的并行度为 30,它表示需要 30 个元素
- CSV 解析阶段发出 30 个元素
- 当接收到来自客户端 actor 的第一个元素对请求的响应时,响应向下传播到持久 actor 的请求
- 要求从 CSV 解析阶段发出另一个元素的信号
- 当 persist actor 响应时,响应进入接收器
- 由于接收器是
Sink.head
,一旦它接收到一个元素就会取消流,流就会被拆除
- 任何已发送但正在等待响应的客户端参与者的请求仍将得到处理
持久化 actor 的响应与 CSV 解析和向客户端 actor 发送请求之间存在一些竞争:如果后者更快,客户端 actor 可能会处理 31 行。
如果您只想在处理完每个元素后得到一个 Future[Done]
,Sink.last
将非常适合此代码。
我正在尝试从 S3 下载一个大文件并将其数据发送给另一个执行 http 请求的参与者,然后保存响应。我想限制该演员发送的请求数量,因此我需要处理背压。
我试过这样做:
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.mapAsync(1){
case b@Buzz(_, _) =>
(persistActor ? b).mapTo[Done]
}.runWith(Sink.head)
问题是我看到它只从文件中读取 30 行作为并行度的限制集。我不确定这是实现我正在寻找的目标的正确方法
如果原因不是我在评论中提到的 Sink.head
的使用,您可以使用 Sink.actorRefWithBackpressure
.
示例代码:
class PersistActor extends Actor {
override def receive: Receive = {
case "init" =>
println("Initialized")
case "complete" =>
context.stop(self)
case message =>
//Persist Buzz??
sender() ! Done
}
}
val sink = Sink
.actorRefWithBackpressure(persistActor, "init", Done, "complete", PartialFunction.empty)
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
//You could backpressure here too...
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.to(sink)
.run()
正如 Johny 在他的评论中指出的那样,Sink.head
是导致流仅处理大约 30 个元素的原因。发生的情况大约是:
Sink.head
表示对 1 个元素的需求- 这个需求向上传播到第二个
mapAsync
- 当需求到达第一个
mapAsync
时,因为它的并行度为 30,它表示需要 30 个元素 - CSV 解析阶段发出 30 个元素
- 当接收到来自客户端 actor 的第一个元素对请求的响应时,响应向下传播到持久 actor 的请求
- 要求从 CSV 解析阶段发出另一个元素的信号
- 当 persist actor 响应时,响应进入接收器
- 由于接收器是
Sink.head
,一旦它接收到一个元素就会取消流,流就会被拆除 - 任何已发送但正在等待响应的客户端参与者的请求仍将得到处理
持久化 actor 的响应与 CSV 解析和向客户端 actor 发送请求之间存在一些竞争:如果后者更快,客户端 actor 可能会处理 31 行。
如果您只想在处理完每个元素后得到一个 Future[Done]
,Sink.last
将非常适合此代码。