使用 Alpakka 的无限 AMQP 消费者
Infinite AMQP Consumer with Alpakka
我正在尝试使用 Alpakka 实现一个连接到 AMQP 代理的非常简单的服务。我只是希望它在将消息推送到给定 exchange/topic.
时将其队列中的消息作为流使用
在我的测试中似乎一切正常,但当我尝试启动我的服务时,我意识到我的流只使用了我的消息一次然后就退出了。
基本上我使用的是 Alpakka 文档中的代码:
def consume()={
val amqpSource = AmqpSource.committableSource(
TemporaryQueueSourceSettings(connectionProvider, exchangeName)
.withDeclaration(exchangeDeclaration)
.withRoutingKey(topic),
bufferSize = prefetchCount
)
val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))
amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}
我尝试安排 consume()
每秒执行一次,但我遇到了 OutOfMemoryException
个问题。
有什么合适的方法可以使这段代码 运行 成为一个无限循环吗?
如果您希望 Source
在失败或被取消时重新启动,请用 RestartSource.withBackoff
包装它。
我正在尝试使用 Alpakka 实现一个连接到 AMQP 代理的非常简单的服务。我只是希望它在将消息推送到给定 exchange/topic.
时将其队列中的消息作为流使用在我的测试中似乎一切正常,但当我尝试启动我的服务时,我意识到我的流只使用了我的消息一次然后就退出了。
基本上我使用的是 Alpakka 文档中的代码:
def consume()={
val amqpSource = AmqpSource.committableSource(
TemporaryQueueSourceSettings(connectionProvider, exchangeName)
.withDeclaration(exchangeDeclaration)
.withRoutingKey(topic),
bufferSize = prefetchCount
)
val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))
amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}
我尝试安排 consume()
每秒执行一次,但我遇到了 OutOfMemoryException
个问题。
有什么合适的方法可以使这段代码 运行 成为一个无限循环吗?
如果您希望 Source
在失败或被取消时重新启动,请用 RestartSource.withBackoff
包装它。