如何定义 AmqpSource 订阅多个交换?

How to define AmqpSource to subscribe to multiple exchange?

现在我正在使用

订阅单一交换
AmqpSource.atMostOnceSource(
    NamedQueueSourceSettings(..))

我希望能够订阅多个交换。谁能帮我解决这个问题?

如果对于特定的 alpakka 源没有任何具体说明,您可以使用 Merge 或 MerbeHub。

如果您预先知道所有来源,您可以使用 Merge

将多个来源合并为一个

如果您不知道所有来源,您可以使用 MergeHub 例如

// A simple consumer that will print to the console for now  
val consumer = Sink.foreach(println)

// Attach a MergeHub Source to the consumer. This will materialize to a
// corresponding Sink.
val runnableGraph: RunnableGraph[Sink[String, NotUsed]] =
  MergeHub.source[String](perProducerBufferSize = 16).to(consumer)

// By running/materializing the consumer we get back a Sink, and hence
// now have access to feed elements into it. This Sink can be materialized
// any number of times, and every element that enters the Sink will
// be consumed by our consumer.
val toConsumer: Sink[String, NotUsed] = runnableGraph.run()

// Feeding two independent sources into the hub.
AmqpSource.atMostOnceSource(
   NamedQueueSourceSettings(..)).runWith(toConsumer)
AmqpSource.atMostOnceSource(
   NamedQueueSourceSettings(..)).runWith(toConsumer)