基于 akka 流条件的替代流

Alternative flows based on condition for akka stream

有一个带有自定义流的流,在某个阶段我想拆分流并有两个备选数据处理,稍后将再次合并。

例如

                  -> F3 -> F6 
Src -> F1 -> F2                > Merge -> Sink 
                  -> F4 -> F5

F2 应该有一个条件说明如果数据包含格式 A 那么它应该去流 F3,否则去 F4.

据我所知,每个流在每个方向上只能有一个端口(如果是 bidi,则为两个)- 那么我如何支持这样的流?

您可以使用 Broadcast 拆分流,然后您可以在每个流上使用 filtercollect 来过滤所需的数据。

val split = builder.add(Broadcast[Int](2))

Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
                   -> filterCondB -> F4 -> F5 -> Merge

此外,还有 Partition 阶段处理输出端口的数量和从值到端口号 f: T => Int 的映射函数。

val portMapper(value: T): Int = value match {
  case CondA => 0
  case CondB => 1
}

val split = builder.add(Partition[T](2, portMapper))

Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
             split -> F4 -> F5 -> Merge

也许有更简单的方法。