Storm 使用 Spring AMQP 从 RabbitMq 读取消息

Storm to read message from RabbitMq using Spring AMQP

我想从 rabbitMq 队列中使用我的 Storm Spout 中的消息。

现在,我们正在使用 Spring AMQP 从 RabbitMq 异步发送和接收消息。

Spring AMQP 提供机制(创建监听器或使用注解@RabbitListner)从队列中读取消息。

问题是我可以让监听器从队列中读取消息。但是我如何将此消息发送到风暴集群上 运行 的 Storm Spout?

拓扑将启动一个集群,但是在我的 spout 的 nextTuple() 方法中,我需要从这个队列中读取消息。 Spring AMQP 可以用在这里吗?

我有一个侦听器配置为从队列中读取消息:

@RabbitListener(queues = "queueName")
public void processMessage(QueueMessage message) {

} 

如何将监听器接收到的上述消息发送到我在集群上的 spout 运行。

或者,spout 的 nextTuple() 方法如何在其中包含此方法?可以吗

我在这里使用 Java 作为一种语言。

您可以使用 RabbitTemplate receivereceiveAndConvert 方法之一按需阅读消息(而不是消息驱动)。

默认情况下,如果队列中没有消息,它们将 return 为 null。

编辑:

如果您设置 receiveTimeout(在 1.5 或更高版本中可用),接收方法将在该时间阻塞(它在内部使用异步消费者并且不轮询)。

但它仍然不如监听器高效,因为每个方法都会创建一个新的消费者;要使用侦听器,您需要在 nextTuple()(例如 BlockingQueue)中使用一些内部阻塞机制来等待消息。