akka 将文件行流式传输到 actor 路由器并用单个 actor 写入。如何处理背压
akka streaming file lines to actor router and writing with single actor. how to handle the backpressure
我想将文件从 s3 流式传输到 actor 进行解析和丰富,并将输出写入其他文件。
应该限制 parserActor 的数量,例如
application.conf
akka{
actor{
deployment {
HereClient/router1 {
router = round-robin-pool
nr-of-instances = 28
}
}
}
}
code
val writerActor = actorSystem.actorOf(WriterActor.props())
val parser = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")
然而,正在写入文件的 actor 应限制为 1(单例)
我试过做类似
的事情
val reader: ParquetReader[GenericRecord] = AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.map (record => record ! parser)
但我不确定是否正确处理了背压。有什么建议吗?
我认为您应该使用其中一种“异步”操作
也许这个 q/a 给你一些灵感 Processing an akka stream asynchronously and writing to a file sink
确实,您的解决方案是忽略背压。
在保持背压的同时让流与 actor 交互的正确方法是使用 ask pattern support of akka-stream (reference).
根据我对您示例的理解,您有 2 个独立的演员交互点:
- 将记录发送给解析参与者(通过路由器)
- 将已解析的记录发送到单例写入 actor
我会做的是类似下面的事情:
val writerActor = actorSystem.actorOf(WriterActor.props())
val parserActor = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")
val reader: ParquetReader[GenericRecord] = AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.ask[ParsedRecord](28)(parserActor)
.ask[WriteAck](writerActor)
.runWith(Sink.ignore)
我们的想法是将所有 GenericRecord
元素发送到 parserActor
,后者将回复 ParsedRecord
。作为示例,我们指定并行度为 28,因为这是您配置的实例数,但是只要您使用的值高于 actor 实例的实际数量,actor 就不会遭受工作饥饿。
一旦 parseActor
回复解析结果(此处由 ParsedRecord
表示),我们将应用相同的模式与单例编写器 actor 进行交互。请注意,这里我们没有指定并行性,因为我们只有一个实例,因此一次发送多于 1 条消息没有意义(实际上,由于在异步边界处进行缓冲,无论如何都会发生这种情况,但这只是一个内置优化)。在这种情况下,我们希望 writer actor 回复 WriteAck
以通知我们写入已成功,我们可以发送下一个元素。
使用这种方法,您可以在整个流中保持背压。
我想将文件从 s3 流式传输到 actor 进行解析和丰富,并将输出写入其他文件。
应该限制 parserActor 的数量,例如
application.conf
akka{
actor{
deployment {
HereClient/router1 {
router = round-robin-pool
nr-of-instances = 28
}
}
}
}
code
val writerActor = actorSystem.actorOf(WriterActor.props())
val parser = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")
然而,正在写入文件的 actor 应限制为 1(单例)
我试过做类似
的事情val reader: ParquetReader[GenericRecord] = AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.map (record => record ! parser)
但我不确定是否正确处理了背压。有什么建议吗?
我认为您应该使用其中一种“异步”操作
也许这个 q/a 给你一些灵感 Processing an akka stream asynchronously and writing to a file sink
确实,您的解决方案是忽略背压。
在保持背压的同时让流与 actor 交互的正确方法是使用 ask pattern support of akka-stream (reference).
根据我对您示例的理解,您有 2 个独立的演员交互点:
- 将记录发送给解析参与者(通过路由器)
- 将已解析的记录发送到单例写入 actor
我会做的是类似下面的事情:
val writerActor = actorSystem.actorOf(WriterActor.props())
val parserActor = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")
val reader: ParquetReader[GenericRecord] = AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.ask[ParsedRecord](28)(parserActor)
.ask[WriteAck](writerActor)
.runWith(Sink.ignore)
我们的想法是将所有 GenericRecord
元素发送到 parserActor
,后者将回复 ParsedRecord
。作为示例,我们指定并行度为 28,因为这是您配置的实例数,但是只要您使用的值高于 actor 实例的实际数量,actor 就不会遭受工作饥饿。
一旦 parseActor
回复解析结果(此处由 ParsedRecord
表示),我们将应用相同的模式与单例编写器 actor 进行交互。请注意,这里我们没有指定并行性,因为我们只有一个实例,因此一次发送多于 1 条消息没有意义(实际上,由于在异步边界处进行缓冲,无论如何都会发生这种情况,但这只是一个内置优化)。在这种情况下,我们希望 writer actor 回复 WriteAck
以通知我们写入已成功,我们可以发送下一个元素。
使用这种方法,您可以在整个流中保持背压。