如何有条件地将消息传递到特定的下一阶段(以及其他阶段)?

how to conditionally pass a message to a specific next stage (among other stages)?

考虑一个简单的场景,当依赖于传递的消息的某些属性时,我希望它由特定的下一阶段处理并继续。

[Source[ActionMessage]] ~> [Flow[ActionMessage, EnrichedActionMessage]] 
~> (eAM: EnrichedActionMessage => eAM.actionType match {
      case ActionA => eAM ~> Flow[EnrichedActionMessage, ReactionA] ~> Sink[ReactionA]
      case ActionB => eAM ~> Flow[EnrichedActionMessage, ReactionB] ~> Sink[ReactionB]
      case ActionC => eAM ~> Flow[EnrichedActionMessage, ReactionC] ~> Sink[ReactionC]
    })

如何实现到阶段图阶段的条件路由?

此答案基于 akka-stream 版本 2.4.2-RC1。 API 在其他版本中可能略有不同。依赖可以被 sbt:

消费
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2-RC1"

使用 Partition 组件:

val shape = GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._

  val first = b.add(Sink.foreach[Int](elem ⇒ println("first:\t" + elem)))
  val second = b.add(Sink.foreach[Int](elem ⇒ println("second:\t" + elem)))
  val third = b.add(Sink.foreach[Int](elem ⇒ println("third:\t" + elem)))
  val p = b.add(Partition[Int](3, elem ⇒ elem match {
    case 0                ⇒ 0
    case elem if elem < 0 ⇒ 1
    case elem if elem > 0 ⇒ 2
  }))

  p ~> first
  p ~> second
  p ~> third

  SinkShape(p.in)
}
Source(List(0, 1, 2, -1, 1, -5, 0)).to(shape).run()

/*
Output:
first: 0
third: 1
third: 2
second: -1
third: 1
second: -5
first: 0
*/

如果您想稍后对元素进行任何处理,您也可以 return new FanOutShape3(p.in, p.out(0), p.out(1), p.out(2)) 而不是 SinkShape