你如何处理一系列 Akka Stream Sources?
How do you process a sequence of Akka Stream Sources?
我们有一个可以处理事件的Sink
:
def parseEvent(): Sink[T, Future[akka.Done]] = {
Sink.foreach[T] { event => {
// Do stuff with the event
}}
}
这适用于单个 Source
:
val mySource: Source[T] = ...
mySource.takeWhile( someCheck, true ).runWith(parseEvent)
如果你有:
,你如何让它工作?
val mySources: Seq[Source[T]] = ...
所有来源应 运行 并行,所有事件应达到 parseEvent
。
以下内容应该符合要求:
import akka.NotUsed
import akka.stream.scaladsl.{ Concat, Merge, Source }
def sourceFromSources[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] =
sources.size match {
case s if s < 1 => Source.empty[T]
case 1 => sources.head
case 2 => sources.head.merge(sources(1))
case _ => Source.combine(sources.head, sources(1), sources.drop(2): _*)(Merge(_))
}
合并策略“合并多个流,在元素从输入流到达时获取元素”,并在多个流具有可用元素时随机选择。背压从下游传播到上游。
我们有一个可以处理事件的Sink
:
def parseEvent(): Sink[T, Future[akka.Done]] = {
Sink.foreach[T] { event => {
// Do stuff with the event
}}
}
这适用于单个 Source
:
val mySource: Source[T] = ...
mySource.takeWhile( someCheck, true ).runWith(parseEvent)
如果你有:
,你如何让它工作?val mySources: Seq[Source[T]] = ...
所有来源应 运行 并行,所有事件应达到 parseEvent
。
以下内容应该符合要求:
import akka.NotUsed
import akka.stream.scaladsl.{ Concat, Merge, Source }
def sourceFromSources[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] =
sources.size match {
case s if s < 1 => Source.empty[T]
case 1 => sources.head
case 2 => sources.head.merge(sources(1))
case _ => Source.combine(sources.head, sources(1), sources.drop(2): _*)(Merge(_))
}
合并策略“合并多个流,在元素从输入流到达时获取元素”,并在多个流具有可用元素时随机选择。背压从下游传播到上游。