相当于纯akka流中的balancer、broadcast和merge
Equivalent to balancer, broadcast and merge in pure akka streams
在 akka 流中,使用 graph dsl builder 我可以使用平衡器、广播和合并运算符:
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val balancer = builder.add(Balance[Result1](2))
val merger = builder.add(Merge[Result2](2))
balancer.out(0) ~> step1.async ~> step2.async ~> merger.in(0)
balancer.out(1) ~> step1.async ~> step2.async ~> merger.in(1)
FlowShape(balancer.in, merger.out)
}
我如何使用普通的 Source、Sink 和 Flow 实现相同的逻辑 api?
我可以这样做:
source.mapAsync(2)(Future(...))
但是,正如我所见,它在语义上并不完全等同于第一个示例。
使用 Source.combine
和 Sink.combine
。来自 documentation:
There is a simplified API you can use to combine sources and sinks with junctions like: Broadcast[T]
, Balance[T]
, Merge[In]
and Concat[A]
without the need for using the Graph DSL. The combine method takes care of constructing the necessary graph underneath. In following example we combine two sources into one (fan-in):
val sourceOne = Source(List(1))
val sourceTwo = Source(List(2))
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))
val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))
The same can be done for a Sink[T]
but in this case it will be fan-out:
val sendRmotely = Sink.actorRef(actorRef, "Done")
val localProcessing = Sink.foreach[Int](_ => /* do something useful */ ())
val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_))
Source(List(0, 1, 2)).runWith(sink)
在 akka 流中,使用 graph dsl builder 我可以使用平衡器、广播和合并运算符:
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val balancer = builder.add(Balance[Result1](2))
val merger = builder.add(Merge[Result2](2))
balancer.out(0) ~> step1.async ~> step2.async ~> merger.in(0)
balancer.out(1) ~> step1.async ~> step2.async ~> merger.in(1)
FlowShape(balancer.in, merger.out)
}
我如何使用普通的 Source、Sink 和 Flow 实现相同的逻辑 api?
我可以这样做:
source.mapAsync(2)(Future(...))
但是,正如我所见,它在语义上并不完全等同于第一个示例。
使用 Source.combine
和 Sink.combine
。来自 documentation:
There is a simplified API you can use to combine sources and sinks with junctions like:
Broadcast[T]
,Balance[T]
,Merge[In]
andConcat[A]
without the need for using the Graph DSL. The combine method takes care of constructing the necessary graph underneath. In following example we combine two sources into one (fan-in):
val sourceOne = Source(List(1))
val sourceTwo = Source(List(2))
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))
val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))
The same can be done for a
Sink[T]
but in this case it will be fan-out:
val sendRmotely = Sink.actorRef(actorRef, "Done")
val localProcessing = Sink.foreach[Int](_ => /* do something useful */ ())
val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_))
Source(List(0, 1, 2)).runWith(sink)