如何在多个源之间切换?
How to switch between multiple Sources?
假设我有两个相同类型的无限来源 可以连接到一个图表。我想 s 从外部已经具体化的图形在它们之间,可能与使用 KillS.
关闭其中一个的方法相同
val source1: Source[ByteString, NotUsed] = ???
val source2: Source[ByteString, NotUsed] = ???
val (switcher: Switcher, source: Source[ByteString, NotUsed]) =
Source.combine(source1,source2).withSwitcher.run()
switcher.switch()
默认情况下,我想使用 source1
,在 s 之后,我想使用来自 source2
的数据
source1
\
switcher ~> source
source2
是否可以使用 Akka Streams 实现此逻辑?
好的,过了一段时间我找到了解决方案。
所以在这里我可以使用与我们在 VLAN 中相同的原理。我只需要标记我的来源,然后通过 MergeHub 传递它们。之后很容易通过标签过滤这些来源并产生正确的结果作为来源。
我需要从一个源切换到另一个源,只需更改过滤条件即可。
source1.map(s => (tag1, s))
\
MergeHub.filter(_._1 == tagX).map(_._2) -> Source
/
source2.map(s => (tag2, s))
这里是一些例子:
object SomeSource {
private var current = "tag1"
val source1: Source[ByteString, NotUsed] = ???
val source2: Source[ByteString, NotUsed] = ???
def switch = {
current = if (current == "tag1") "tag2" else "tag1"
}
val (sink: Sink[(String, ByteString), NotUsed],
source: Source[ByteString, NotUsed]) =
MergeHub.source[(String, ByteString)]
.filter(_._1 == current)
.via(Flow[(String, ByteString)].map(_._2))
.toMat(BroadcastHub.sink[ByteString])(Keep.both).run()
source1.map(s => ("tag1", s)).runWith(sink)
source2.map(s => ("tag2", s)).runWith(sink)
}
SomeSource.source // do something with Source
SomeSource.switch() // then switch
假设我有两个相同类型的无限来源 可以连接到一个图表。我想 s 从外部已经具体化的图形在它们之间,可能与使用 KillS.
关闭其中一个的方法相同val source1: Source[ByteString, NotUsed] = ???
val source2: Source[ByteString, NotUsed] = ???
val (switcher: Switcher, source: Source[ByteString, NotUsed]) =
Source.combine(source1,source2).withSwitcher.run()
switcher.switch()
默认情况下,我想使用 source1
,在 s 之后,我想使用来自 source2
source1
\
switcher ~> source
source2
是否可以使用 Akka Streams 实现此逻辑?
好的,过了一段时间我找到了解决方案。
所以在这里我可以使用与我们在 VLAN 中相同的原理。我只需要标记我的来源,然后通过 MergeHub 传递它们。之后很容易通过标签过滤这些来源并产生正确的结果作为来源。
我需要从一个源切换到另一个源,只需更改过滤条件即可。
source1.map(s => (tag1, s))
\
MergeHub.filter(_._1 == tagX).map(_._2) -> Source
/
source2.map(s => (tag2, s))
这里是一些例子:
object SomeSource {
private var current = "tag1"
val source1: Source[ByteString, NotUsed] = ???
val source2: Source[ByteString, NotUsed] = ???
def switch = {
current = if (current == "tag1") "tag2" else "tag1"
}
val (sink: Sink[(String, ByteString), NotUsed],
source: Source[ByteString, NotUsed]) =
MergeHub.source[(String, ByteString)]
.filter(_._1 == current)
.via(Flow[(String, ByteString)].map(_._2))
.toMat(BroadcastHub.sink[ByteString])(Keep.both).run()
source1.map(s => ("tag1", s)).runWith(sink)
source2.map(s => ("tag2", s)).runWith(sink)
}
SomeSource.source // do something with Source
SomeSource.switch() // then switch