akka 将文件行流式传输到 actor 路由器并用单个 actor 写入。如何处理背压

akka streaming file lines to actor router and writing with single actor. how to handle the backpressure

我想将文件从 s3 流式传输到 actor 进行解析和丰富,并将输出写入其他文件。 应该限制​​ parserActor 的数量,例如 application.conf

akka{
    actor{
        deployment {
              HereClient/router1 {
                router = round-robin-pool
                nr-of-instances = 28
              }
        }
    }
}

code

val writerActor = actorSystem.actorOf(WriterActor.props())
val parser = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")

然而,正在写入文件的 actor 应限制为 1(单例)

我试过做类似

的事情
val reader: ParquetReader[GenericRecord] =  AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()

  val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.map (record =>  record ! parser)

但我不确定是否正确处理了背压。有什么建议吗?

我认为您应该使用其中一种“异步”操作

也许这个 q/a 给你一些灵感 Processing an akka stream asynchronously and writing to a file sink

确实,您的解决方案是忽略背压。

在保持背压的同时让流与 actor 交互的正确方法是使用 ask pattern support of akka-stream (reference).

根据我对您示例的理解,您有 2 个独立的演员交互点:

  1. 将记录发送给解析参与者(通过路由器)
  2. 将已解析的记录发送到单例写入 actor

我会做的是类似下面的事情:

val writerActor = actorSystem.actorOf(WriterActor.props())
val parserActor = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")

val reader: ParquetReader[GenericRecord] =  AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()

val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.ask[ParsedRecord](28)(parserActor)
      .ask[WriteAck](writerActor)
      .runWith(Sink.ignore)

我们的想法是将所有 GenericRecord 元素发送到 parserActor,后者将回复 ParsedRecord。作为示例,我们指定并行度为 28,因为这是您配置的实例数,但是只要您使用的值高于 actor 实例的实际数量,actor 就不会遭受工作饥饿。

一旦 parseActor 回复解析结果(此处由 ParsedRecord 表示),我们将应用相同的模式与单例编写器 actor 进行交互。请注意,这里我们没有指定并行性,因为我们只有一个实例,因此一次发送多于 1 条消息没有意义(实际上,由于在异步边界处进行缓冲,无论如何都会发生这种情况,但这只是一个内置优化)。在这种情况下,我们希望 writer actor 回复 WriteAck 以通知我们写入已成功,我们可以发送下一个元素。

使用这种方法,您可以在整个流中保持背压。