根据条件复用 Akka sources/flows
Multiplex Akka sources/flows based on condition
有没有办法根据某些外部条件复用两个或多个 Akka 源或流?它可能看起来像这样:
def cond: Boolean = ???
val src1 = Source.fromIterator(i1)
val src2 = Source.fromIterator(i2)
val src3 = Source.mux(src1, src2, cond)
取决于 cond
结果 src3
应包含来自 src1
的项目或来自 src2
的项目,绝不能包含两者。
我发现好像是相反的操作divertTo
。同时,none的fan-in操作似乎支持条件合并。
我不确定这是你需要的,但你可以尝试以下方法:
def cond: Boolean = Random.nextBoolean()
val src1 = Source.fromIterator(() => LazyList.from(1).iterator)
val src2 = Source.fromIterator(() => LazyList.from(-1, -1).iterator)
val src3 = src1.zip(src2).map(pair => if (cond) pair._1 else pair._2)
src3.runForeach(println)
在一个 run in Scastie 中,输出开始于:
1
-2
3
4
-5
-6
7
-8
-9
10
...
如您所见,在此示例中,我在 2 个流之间随机选择。
我会提出以下建议:
def mux[T](a: Source[T, Any], b: Source[T, Any])(cond: Int => Boolean): Source[T, Any] = {
a.map((1, _)).merge(b.map((2, _)))
.filter(t => cond(t._1))
.map(_._2)
}
简单地说,它将标识符附加到每个源发出的元素(这里是 1、2,但可以是任何东西),然后使用提供的 cond
函数进行过滤以仅保留来自当前选择的源,然后映射回元素。
我认为 zip
不是一个好主意,因为它 “当所有输入都有可用元素时发出”,即即使有可用元素从源 A 并且您实际上想切换到源 A,zip
将等到 B 中有一个可用元素才会发出 (ref)。
另一方面,merge
将在任何源有可用项目时立即发出。
有没有办法根据某些外部条件复用两个或多个 Akka 源或流?它可能看起来像这样:
def cond: Boolean = ???
val src1 = Source.fromIterator(i1)
val src2 = Source.fromIterator(i2)
val src3 = Source.mux(src1, src2, cond)
取决于 cond
结果 src3
应包含来自 src1
的项目或来自 src2
的项目,绝不能包含两者。
我发现好像是相反的操作divertTo
。同时,none的fan-in操作似乎支持条件合并。
我不确定这是你需要的,但你可以尝试以下方法:
def cond: Boolean = Random.nextBoolean()
val src1 = Source.fromIterator(() => LazyList.from(1).iterator)
val src2 = Source.fromIterator(() => LazyList.from(-1, -1).iterator)
val src3 = src1.zip(src2).map(pair => if (cond) pair._1 else pair._2)
src3.runForeach(println)
在一个 run in Scastie 中,输出开始于:
1
-2
3
4
-5
-6
7
-8
-9
10
...
如您所见,在此示例中,我在 2 个流之间随机选择。
我会提出以下建议:
def mux[T](a: Source[T, Any], b: Source[T, Any])(cond: Int => Boolean): Source[T, Any] = {
a.map((1, _)).merge(b.map((2, _)))
.filter(t => cond(t._1))
.map(_._2)
}
简单地说,它将标识符附加到每个源发出的元素(这里是 1、2,但可以是任何东西),然后使用提供的 cond
函数进行过滤以仅保留来自当前选择的源,然后映射回元素。
我认为 zip
不是一个好主意,因为它 “当所有输入都有可用元素时发出”,即即使有可用元素从源 A 并且您实际上想切换到源 A,zip
将等到 B 中有一个可用元素才会发出 (ref)。
另一方面,merge
将在任何源有可用项目时立即发出。