Akka Streams Unzip/Zip 是否保持顺序?
Does Akka Streams Unzip/Zip preserve order?
如果我解压缩一系列元组,对两个流执行一些异步突变,然后重新压缩它们,Akka 是否保证流以相同的顺序重新压缩?
示例:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> zip.in0
unzip.out1 ~> append ~> zip.in1
FlowShape(unzip.in, zip.out)
})
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
.via(graph)
.runWith(Sink.seq)
Await.result(out, 1 second)
在这个简单的测试中,输出是Vector((1,a-x), (2,b-x), (3,c-x))
。所以事情看起来不错。但我不确定我能否相信情况会一直如此。
引起一些关注的是:
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1
// output: Vector((1,a-x), (3,b-x))
即使保留顺序,也不能保证原始元组关系会保留。
我可以手动检查我的流以确保没有过滤逻辑。但是完成后,我可以确定元组将按照收到的顺序重新压缩吗?
TL;DR 是的,确实如此。来自 Stream ordering Akka 文档:
In Akka Streams almost all computation operators preserve input order of elements. This means that if inputs {IA1,IA2,...,IAn}
“cause” outputs {OA1,OA2,...,OAk}
and inputs {IB1,IB2,...,IBm}
“cause” outputs {OB1,OB2,...,OBl}
and all of IAi
happened before all IBi
then OAi
happens before OBi
.
This property is even upheld by async
operations such as mapAsync
, however an unordered version exists called mapAsyncUnordered
which does not preserve this ordering.
However, in the case of Junctions which handle multiple input streams (e.g. Merge
) the output order is, in general, not defined for elements arriving on different input ports. That is a merge-like operation may emit Ai
before emitting Bi
, and it is up to its internal logic to decide the order of emitted elements. Specialized elements such as Zip
however do guarantee their outputs order, as each output element depends on all upstream elements having been signalled already – thus the ordering in the case of zipping is defined by this property.
If you find yourself in need of fine grained control over order of emitted elements in fan-in scenarios consider using MergePreferred
, MergePrioritized
or GraphStage
– which gives you full control over how the merge is performed.
如果我解压缩一系列元组,对两个流执行一些异步突变,然后重新压缩它们,Akka 是否保证流以相同的顺序重新压缩?
示例:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> zip.in0
unzip.out1 ~> append ~> zip.in1
FlowShape(unzip.in, zip.out)
})
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
.via(graph)
.runWith(Sink.seq)
Await.result(out, 1 second)
在这个简单的测试中,输出是Vector((1,a-x), (2,b-x), (3,c-x))
。所以事情看起来不错。但我不确定我能否相信情况会一直如此。
引起一些关注的是:
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1
// output: Vector((1,a-x), (3,b-x))
即使保留顺序,也不能保证原始元组关系会保留。
我可以手动检查我的流以确保没有过滤逻辑。但是完成后,我可以确定元组将按照收到的顺序重新压缩吗?
TL;DR 是的,确实如此。来自 Stream ordering Akka 文档:
In Akka Streams almost all computation operators preserve input order of elements. This means that if inputs
{IA1,IA2,...,IAn}
“cause” outputs{OA1,OA2,...,OAk}
and inputs{IB1,IB2,...,IBm}
“cause” outputs{OB1,OB2,...,OBl}
and all ofIAi
happened before allIBi
thenOAi
happens beforeOBi
.This property is even upheld by
async
operations such asmapAsync
, however an unordered version exists calledmapAsyncUnordered
which does not preserve this ordering.However, in the case of Junctions which handle multiple input streams (e.g.
Merge
) the output order is, in general, not defined for elements arriving on different input ports. That is a merge-like operation may emitAi
before emittingBi
, and it is up to its internal logic to decide the order of emitted elements. Specialized elements such asZip
however do guarantee their outputs order, as each output element depends on all upstream elements having been signalled already – thus the ordering in the case of zipping is defined by this property.If you find yourself in need of fine grained control over order of emitted elements in fan-in scenarios consider using
MergePreferred
,MergePrioritized
orGraphStage
– which gives you full control over how the merge is performed.