计算 Akka Streams 中的元素数量
Count number of elements in Akka Streams
我正在尝试通过 Alpakka 的 CsvFormatting
将 Scala 实体的 Source
转换为 ByteString
的 Source
并计算初始流中的元素数量。你能建议最好的方法来计算 initialSource
元素并将结果保持为 ByteString
Source
:
val initialSource: Source[SomeEntity, NotUsed] = Source.fromPublisher(publisher)
val csvSource: Source[ByteString, NotUsed] = initialSource
.map(e => List(e.firstName, e.lastName, e.city))
.via(CsvFormatting.format())
要计算流中的元素,必须 运行 流。一种方法是将流元素广播到两个接收器:一个接收器是主要处理的结果,另一个接收器只是计算元素的数量。这是一个简单的示例,它使用 graph 来获取两个接收器的物化值:
val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)
val g = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
(s1, s2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[ByteString](2))
val source: Source[ByteString, NotUsed] =
Source(1 to 10)
.map(i => List(i.toString))
.via(CsvFormatting.format())
source ~> broadcast.in
broadcast.out(0) ~> s1
broadcast.out(1) ~> s2
ClosedShape
}) // RunnableGraph[(Future[Done], Future[Int])]
val (fut1, fut2) = g.run()
fut2 onComplete {
case Success(count) => println(s"Number of elements: $count")
case Failure(_) =>
}
在上面的示例中,第一个接收器仅打印流元素并具有 Future[Done]
类型的物化值。第二个接收器执行折叠操作以计算流元素并具有类型 Future[Int]
的物化值。打印如下:
ByteString(49, 13, 10)
ByteString(50, 13, 10)
ByteString(51, 13, 10)
ByteString(52, 13, 10)
ByteString(53, 13, 10)
ByteString(54, 13, 10)
ByteString(55, 13, 10)
ByteString(56, 13, 10)
ByteString(57, 13, 10)
ByteString(49, 48, 13, 10)
Number of elements: 10
将流元素发送到两个不同的接收器,同时保留它们各自的物化值的另一种选择是使用 alsoToMat
:
val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)
val (fut1, fut2) = Source(1 to 10)
.map(i => List(i.toString))
.via(CsvFormatting.format())
.alsoToMat(sink1)(Keep.right)
.toMat(sink2)(Keep.both)
.run() // (Future[Done], Future[Int])
fut2 onComplete {
case Success(count) => println(s"Number of elements: $count")
case Failure(_) =>
}
这产生与前面描述的图形示例相同的结果。
我正在尝试通过 Alpakka 的 CsvFormatting
将 Scala 实体的 Source
转换为 ByteString
的 Source
并计算初始流中的元素数量。你能建议最好的方法来计算 initialSource
元素并将结果保持为 ByteString
Source
:
val initialSource: Source[SomeEntity, NotUsed] = Source.fromPublisher(publisher)
val csvSource: Source[ByteString, NotUsed] = initialSource
.map(e => List(e.firstName, e.lastName, e.city))
.via(CsvFormatting.format())
要计算流中的元素,必须 运行 流。一种方法是将流元素广播到两个接收器:一个接收器是主要处理的结果,另一个接收器只是计算元素的数量。这是一个简单的示例,它使用 graph 来获取两个接收器的物化值:
val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)
val g = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
(s1, s2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[ByteString](2))
val source: Source[ByteString, NotUsed] =
Source(1 to 10)
.map(i => List(i.toString))
.via(CsvFormatting.format())
source ~> broadcast.in
broadcast.out(0) ~> s1
broadcast.out(1) ~> s2
ClosedShape
}) // RunnableGraph[(Future[Done], Future[Int])]
val (fut1, fut2) = g.run()
fut2 onComplete {
case Success(count) => println(s"Number of elements: $count")
case Failure(_) =>
}
在上面的示例中,第一个接收器仅打印流元素并具有 Future[Done]
类型的物化值。第二个接收器执行折叠操作以计算流元素并具有类型 Future[Int]
的物化值。打印如下:
ByteString(49, 13, 10)
ByteString(50, 13, 10)
ByteString(51, 13, 10)
ByteString(52, 13, 10)
ByteString(53, 13, 10)
ByteString(54, 13, 10)
ByteString(55, 13, 10)
ByteString(56, 13, 10)
ByteString(57, 13, 10)
ByteString(49, 48, 13, 10)
Number of elements: 10
将流元素发送到两个不同的接收器,同时保留它们各自的物化值的另一种选择是使用 alsoToMat
:
val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)
val (fut1, fut2) = Source(1 to 10)
.map(i => List(i.toString))
.via(CsvFormatting.format())
.alsoToMat(sink1)(Keep.right)
.toMat(sink2)(Keep.both)
.run() // (Future[Done], Future[Int])
fut2 onComplete {
case Success(count) => println(s"Number of elements: $count")
case Failure(_) =>
}
这产生与前面描述的图形示例相同的结果。