根据条件复用 Akka sources/flows

Multiplex Akka sources/flows based on condition

有没有办法根据某些外部条件复用两个或多个 Akka 源或流?它可能看起来像这样:

def cond: Boolean = ???

val src1 = Source.fromIterator(i1)
val src2 = Source.fromIterator(i2)
val src3 = Source.mux(src1, src2, cond)

取决于 cond 结果 src3 应包含来自 src1 的项目或来自 src2 的项目,绝不能包含两者。

我发现好像是相反的操作divertTo。同时,none的fan-in操作似乎支持条件合并。

我不确定这是你需要的,但你可以尝试以下方法:

def cond: Boolean = Random.nextBoolean()

val src1 = Source.fromIterator(() => LazyList.from(1).iterator)
val src2 = Source.fromIterator(() => LazyList.from(-1, -1).iterator)
val src3 = src1.zip(src2).map(pair => if (cond) pair._1 else pair._2)

src3.runForeach(println)

在一个 run in Scastie 中,输出开始于:

1
-2
3
4
-5
-6
7
-8
-9
10
...

如您所见,在此示例中,我在 2 个流之间随机选择。

我会提出以下建议:

def mux[T](a: Source[T, Any], b: Source[T, Any])(cond: Int => Boolean): Source[T, Any] = {
  a.map((1, _)).merge(b.map((2, _)))
    .filter(t => cond(t._1))
    .map(_._2)
}

简单地说,它将标识符附加到每个源发出的元素(这里是 1、2,但可以是任何东西),然后使用提供的 cond 函数进行过滤以仅保留来自当前选择的源,然后映射回元素。

我认为 zip 不是一个好主意,因为它 “当所有输入都有可用元素时发出”,即即使有可用元素从源 A 并且您实际上想切换到源 A,zip 将等到 B 中有一个可用元素才会发出 (ref)。

另一方面,merge 将在任何源有可用项目时立即发出。