Akka Stream - 如何从多个 SQS 源流式传输
Akka Stream - How to Stream from multiple SQS Sources
这是 的后续 post。
假设我有多个要从中流式传输的 SQS 队列。我正在使用 Alpakka 的 AWS SQS Connector 创建 Source
.
implicit val sqsClient: AmazonSQSAsync = ???
val queueUrls: List[String] = ???
val sources: List[Source[Message, NotUsed]] = queueUrls.map(url => SqsSource(url))
现在,我想 combine
合并它们的来源。但是,Source.combine 方法不支持将列表作为参数传递,但仅支持可变参数。
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed])
当然,我可以手指输入所有源参数。但是,如果我有 10 个源队列,参数会变得很长。
有没有办法从源列表中合并源?
[补充]
正如 Ramon J Romero y Vigil 指出的那样,保持流 "a thin veneer" 是更好的做法。但是,在这种特殊情况下,我使用单个 sqsClient
进行所有 SqsSource
初始化。
您可以使用 foldLeft
连接或合并来源:
val sources: List[Source[Message, NotUsed]] = ???
val concatenated: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ ++ _)
// the same as sources.foldLeft(Source.empty[Message])(_ concat _)
val merged: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ merge _)
或者,您可以将 Source.zipN
与 flatMapConcat
一起使用:
val combined: Source[Message, NotUsed] = Source.zipN(sources).flatMapConcat(Source.apply)
这是
假设我有多个要从中流式传输的 SQS 队列。我正在使用 Alpakka 的 AWS SQS Connector 创建 Source
.
implicit val sqsClient: AmazonSQSAsync = ???
val queueUrls: List[String] = ???
val sources: List[Source[Message, NotUsed]] = queueUrls.map(url => SqsSource(url))
现在,我想 combine
合并它们的来源。但是,Source.combine 方法不支持将列表作为参数传递,但仅支持可变参数。
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed])
当然,我可以手指输入所有源参数。但是,如果我有 10 个源队列,参数会变得很长。
有没有办法从源列表中合并源?
[补充]
正如 Ramon J Romero y Vigil 指出的那样,保持流 "a thin veneer" 是更好的做法。但是,在这种特殊情况下,我使用单个 sqsClient
进行所有 SqsSource
初始化。
您可以使用 foldLeft
连接或合并来源:
val sources: List[Source[Message, NotUsed]] = ???
val concatenated: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ ++ _)
// the same as sources.foldLeft(Source.empty[Message])(_ concat _)
val merged: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ merge _)
或者,您可以将 Source.zipN
与 flatMapConcat
一起使用:
val combined: Source[Message, NotUsed] = Source.zipN(sources).flatMapConcat(Source.apply)