使用 actor 互操作从 s3 流式传输文件时如何处理背压

How to handle backpressure when Streaming file from s3 with actor interop

我正在尝试从 S3 下载一个大文件并将其数据发送给另一个执行 http 请求的参与者,然后保存响应。我想限制该演员发送的请求数量,因此我需要处理背压。
我试过这样做:

 S3.download(bckt, bcktKey).map{
      case Some((file, _)) =>
        file
          .via(CsvParsing.lineScanner())
          .map(_.map(_.utf8String)).drop(1)//drop headers
          .map(p => Foo(p.head, p(1)))
          .mapAsync(30) { p =>
            implicit val askTimeout: Timeout = Timeout(10 seconds)
            (httpClientActor ? p).mapTo[Buzz]
          }
          .mapAsync(1){
          case b@Buzz(_, _) =>
            (persistActor ? b).mapTo[Done]
        }.runWith(Sink.head)

问题是我看到它只从文件中读取 30 行作为并行度的限制集。我不确定这是实现我正在寻找的目标的正确方法

如果原因不是我在评论中提到的 Sink.head 的使用,您可以使用 Sink.actorRefWithBackpressure.

对流进行反压

示例代码:

class PersistActor extends Actor {

    override def receive: Receive = {
      case "init" =>
        println("Initialized")
      case "complete" =>
        context.stop(self)
      case message =>
        //Persist Buzz??
        sender() ! Done
    }
 }

 val sink = Sink
   .actorRefWithBackpressure(persistActor, "init", Done, "complete", PartialFunction.empty)

S3.download(bckt, bcktKey).map{
  case Some((file, _)) =>
    file
      .via(CsvParsing.lineScanner())
      .map(_.map(_.utf8String)).drop(1)//drop headers
      .map(p => Foo(p.head, p(1)))
      //You could backpressure here too...
      .mapAsync(30) { p =>
        implicit val askTimeout: Timeout = Timeout(10 seconds)
        (httpClientActor ? p).mapTo[Buzz]
      }
      .to(sink)
      .run()

正如 Johny 在他的评论中指出的那样,Sink.head 是导致流仅处理大约 30 个元素的原因。发生的情况大约是:

  • Sink.head 表示对 1 个元素的需求
  • 这个需求向上传播到第二个mapAsync
  • 当需求到达第一个 mapAsync 时,因为它的并行度为 30,它表示需要 30 个元素
  • CSV 解析阶段发出 30 个元素
  • 当接收到来自客户端 actor 的第一个元素对请求的响应时,响应向下传播到持久 actor 的请求
  • 要求从 CSV 解析阶段发出另一个元素的信号
  • 当 persist actor 响应时,响应进入接收器
  • 由于接收器是 Sink.head,一旦它接收到一个元素就会取消流,流就会被拆除
  • 任何已发送但正在等待响应的客户端参与者的请求仍将得到处理

持久化 actor 的响应与 CSV 解析和向客户端 actor 发送请求之间存在一些竞争:如果后者更快,客户端 actor 可能会处理 31 行。

如果您只想在处理完每个元素后得到一个 Future[Done]Sink.last 将非常适合此代码。