基于 akka 流条件的替代流
Alternative flows based on condition for akka stream
有一个带有自定义流的流,在某个阶段我想拆分流并有两个备选数据处理,稍后将再次合并。
例如
-> F3 -> F6
Src -> F1 -> F2 > Merge -> Sink
-> F4 -> F5
F2
应该有一个条件说明如果数据包含格式 A
那么它应该去流 F3
,否则去 F4
.
据我所知,每个流在每个方向上只能有一个端口(如果是 bidi,则为两个)- 那么我如何支持这样的流?
您可以使用 Broadcast
拆分流,然后您可以在每个流上使用 filter
或 collect
来过滤所需的数据。
val split = builder.add(Broadcast[Int](2))
Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
-> filterCondB -> F4 -> F5 -> Merge
此外,还有 Partition
阶段处理输出端口的数量和从值到端口号 f: T => Int
的映射函数。
val portMapper(value: T): Int = value match {
case CondA => 0
case CondB => 1
}
val split = builder.add(Partition[T](2, portMapper))
Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
split -> F4 -> F5 -> Merge
也许有更简单的方法。
有一个带有自定义流的流,在某个阶段我想拆分流并有两个备选数据处理,稍后将再次合并。
例如
-> F3 -> F6
Src -> F1 -> F2 > Merge -> Sink
-> F4 -> F5
F2
应该有一个条件说明如果数据包含格式 A
那么它应该去流 F3
,否则去 F4
.
据我所知,每个流在每个方向上只能有一个端口(如果是 bidi,则为两个)- 那么我如何支持这样的流?
您可以使用 Broadcast
拆分流,然后您可以在每个流上使用 filter
或 collect
来过滤所需的数据。
val split = builder.add(Broadcast[Int](2))
Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
-> filterCondB -> F4 -> F5 -> Merge
此外,还有 Partition
阶段处理输出端口的数量和从值到端口号 f: T => Int
的映射函数。
val portMapper(value: T): Int = value match {
case CondA => 0
case CondB => 1
}
val split = builder.add(Partition[T](2, portMapper))
Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
split -> F4 -> F5 -> Merge
也许有更简单的方法。