使用 Alpakka 的无限 AMQP 消费者

Infinite AMQP Consumer with Alpakka

我正在尝试使用 Alpakka 实现一个连接到 AMQP 代理的非常简单的服务。我只是希望它在将消息推送到给定 exchange/topic.

时将其队列中的消息作为流使用

在我的测试中似乎一切正常,但当我尝试启动我的服务时,我意识到我的流只使用了我的消息一次然后就退出了。

基本上我使用的是 Alpakka 文档中的代码:

def consume()={
    val amqpSource = AmqpSource.committableSource(
      TemporaryQueueSourceSettings(connectionProvider, exchangeName)
        .withDeclaration(exchangeDeclaration)
        .withRoutingKey(topic),
      bufferSize = prefetchCount
    )

    val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))

    amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}

我尝试安排 consume() 每秒执行一次,但我遇到了 OutOfMemoryException 个问题。

有什么合适的方法可以使这段代码 运行 成为一个无限循环吗?

如果您希望 Source 在失败或被取消时重新启动,请用 RestartSource.withBackoff 包装它。