Akka Streams - 根据某些谓词拆分传入源数据
Akka Streams - Split incoming sources data based on some predicate
我是 Akka Streams 框架的新手,我想知道以 Akka Streams 为目标解决我的问题的可能性。
想象这里有一个问题:
- 几个大型迭代源,例如可能有 3-4 个大文件
相同的数据;
- 每个文件都需要处理:解析、转换、
计算平均值;
- 文件中的数据应按
一些谓词,然后由分区处理。分区谓词可以是动态的 运行 运行;
- 应该保存每个分区
到另一个文件或流,等等。
可以用Akka Streams解决吗?
您是否考虑过使用 Partition?
正如 Victor 所说,Partition 会为您做这件事。我在 Akka 单元测试中找到了一个例子:
val (s1, s2, s3) = RunnableGraph.fromGraph(GraphDSL.create(Sink.seq[Int], Sink.seq[Int], Sink.seq[Int])(Tuple3.apply) { implicit b ⇒ (sink1, sink2, sink3) ⇒
val partition = b.add(Partition[Int](3, {
case g if (g > 3) ⇒ 0
case l if (l < 3) ⇒ 1
case e if (e == 3) ⇒ 2
}))
Source(List(1, 2, 3, 4, 5)) ~> partition.in
partition.out(0) ~> sink1.in
partition.out(1) ~> sink2.in
partition.out(2) ~> sink3.in
ClosedShape
}).run()
我是 Akka Streams 框架的新手,我想知道以 Akka Streams 为目标解决我的问题的可能性。 想象这里有一个问题:
- 几个大型迭代源,例如可能有 3-4 个大文件 相同的数据;
- 每个文件都需要处理:解析、转换、 计算平均值;
- 文件中的数据应按 一些谓词,然后由分区处理。分区谓词可以是动态的 运行 运行;
- 应该保存每个分区 到另一个文件或流,等等。
可以用Akka Streams解决吗?
您是否考虑过使用 Partition?
正如 Victor 所说,Partition 会为您做这件事。我在 Akka 单元测试中找到了一个例子:
val (s1, s2, s3) = RunnableGraph.fromGraph(GraphDSL.create(Sink.seq[Int], Sink.seq[Int], Sink.seq[Int])(Tuple3.apply) { implicit b ⇒ (sink1, sink2, sink3) ⇒
val partition = b.add(Partition[Int](3, {
case g if (g > 3) ⇒ 0
case l if (l < 3) ⇒ 1
case e if (e == 3) ⇒ 2
}))
Source(List(1, 2, 3, 4, 5)) ~> partition.in
partition.out(0) ~> sink1.in
partition.out(1) ~> sink2.in
partition.out(2) ~> sink3.in
ClosedShape
}).run()