akka-stream - 如何在 Flow/Graph 中以不同方式处理流的最后一个元素
akka-stream - How to treat the last element of a stream differently in a Flow/Graph
我正在尝试实现一个 Akka Streams Flow
,它将 JSON 个对象的流转换为 JSON 个对象的单个数组的流。我可以使用 Concat
在元素之间添加“[”之前和“]”,以及 Zip
在元素之间插入逗号,但我不知道如何不插入最后一个逗号.
我目前的代码是:
trait JsonStreamSupport {
protected def toJsonArrayString[T : Writes] = Flow[T].map(Json.toJson(_)).map(_.toString()).via(jsonArrayWrapper)
private[this] val jsonArrayWrapper: Flow[String, String, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val start = Source.single("[")
val comma = Source.repeat(",")
val end = Source.single("]")
val concat = b.add(Concat[String](3))
val zip = b.add(Zip[String,String])
comma ~> zip.in1
start ~> concat.in(0)
zip.out.map({case (msg,delim) => msg + delim}) ~> concat.in(1)
end ~> concat.in(2)
FlowShape(zip.in0, concat.out)
})
}
当前输出为:
[{"key":"value},{"key","value"},]
但我需要它
[{"key":"value},{"key","value"}]
(没有最后的逗号),
其中数组的每个元素仍然是流的不同元素,因此可以例如通过分块 HTTP 单独发送。
刚刚发现 intersperse
这正是您所需要的,而且比我最初建议的要简单得多:
我正在尝试实现一个 Akka Streams Flow
,它将 JSON 个对象的流转换为 JSON 个对象的单个数组的流。我可以使用 Concat
在元素之间添加“[”之前和“]”,以及 Zip
在元素之间插入逗号,但我不知道如何不插入最后一个逗号.
我目前的代码是:
trait JsonStreamSupport {
protected def toJsonArrayString[T : Writes] = Flow[T].map(Json.toJson(_)).map(_.toString()).via(jsonArrayWrapper)
private[this] val jsonArrayWrapper: Flow[String, String, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val start = Source.single("[")
val comma = Source.repeat(",")
val end = Source.single("]")
val concat = b.add(Concat[String](3))
val zip = b.add(Zip[String,String])
comma ~> zip.in1
start ~> concat.in(0)
zip.out.map({case (msg,delim) => msg + delim}) ~> concat.in(1)
end ~> concat.in(2)
FlowShape(zip.in0, concat.out)
})
}
当前输出为:
[{"key":"value},{"key","value"},]
但我需要它
[{"key":"value},{"key","value"}]
(没有最后的逗号),
其中数组的每个元素仍然是流的不同元素,因此可以例如通过分块 HTTP 单独发送。
刚刚发现 intersperse
这正是您所需要的,而且比我最初建议的要简单得多: