Akka Streams Unzip/Zip 是否保持顺序?
Does Akka Streams Unzip/Zip preserve order?
如果我解压缩一系列元组,对两个流执行一些异步突变,然后重新压缩它们,Akka 是否保证流以相同的顺序重新压缩?
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> zip.in0
unzip.out1 ~> append ~> zip.in1
FlowShape(unzip.in, zip.out)
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
Await.result(out, 1 second)
在这个简单的测试中,输出是Vector((1,a-x), (2,b-x), (3,c-x))
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1
// output: Vector((1,a-x), (3,b-x))
TL;DR 是的,确实如此。来自 Stream ordering Akka 文档:
In Akka Streams almost all computation operators preserve input order of elements. This means that if inputs {IA1,IA2,...,IAn}
“cause” outputs {OA1,OA2,...,OAk}
and inputs {IB1,IB2,...,IBm}
“cause” outputs {OB1,OB2,...,OBl}
and all of IAi
happened before all IBi
then OAi
happens before OBi
This property is even upheld by async
operations such as mapAsync
, however an unordered version exists called mapAsyncUnordered
which does not preserve this ordering.
However, in the case of Junctions which handle multiple input streams (e.g. Merge
) the output order is, in general, not defined for elements arriving on different input ports. That is a merge-like operation may emit Ai
before emitting Bi
, and it is up to its internal logic to decide the order of emitted elements. Specialized elements such as Zip
however do guarantee their outputs order, as each output element depends on all upstream elements having been signalled already – thus the ordering in the case of zipping is defined by this property.
If you find yourself in need of fine grained control over order of emitted elements in fan-in scenarios consider using MergePreferred
, MergePrioritized
or GraphStage
– which gives you full control over how the merge is performed.
如果我解压缩一系列元组,对两个流执行一些异步突变,然后重新压缩它们,Akka 是否保证流以相同的顺序重新压缩?
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> zip.in0
unzip.out1 ~> append ~> zip.in1
FlowShape(unzip.in, zip.out)
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
Await.result(out, 1 second)
在这个简单的测试中,输出是Vector((1,a-x), (2,b-x), (3,c-x))
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1
// output: Vector((1,a-x), (3,b-x))
TL;DR 是的,确实如此。来自 Stream ordering Akka 文档:
In Akka Streams almost all computation operators preserve input order of elements. This means that if inputs
“cause” outputs{OA1,OA2,...,OAk}
and inputs{IB1,IB2,...,IBm}
“cause” outputs{OB1,OB2,...,OBl}
and all ofIAi
happened before allIBi
happens beforeOBi
.This property is even upheld by
operations such asmapAsync
, however an unordered version exists calledmapAsyncUnordered
which does not preserve this ordering.However, in the case of Junctions which handle multiple input streams (e.g.
) the output order is, in general, not defined for elements arriving on different input ports. That is a merge-like operation may emitAi
before emittingBi
, and it is up to its internal logic to decide the order of emitted elements. Specialized elements such asZip
however do guarantee their outputs order, as each output element depends on all upstream elements having been signalled already – thus the ordering in the case of zipping is defined by this property.If you find yourself in need of fine grained control over order of emitted elements in fan-in scenarios consider using
– which gives you full control over how the merge is performed.