Storm 使用 Spring AMQP 从 RabbitMq 读取消息
Storm to read message from RabbitMq using Spring AMQP
我想从 rabbitMq 队列中使用我的 Storm Spout 中的消息。
现在,我们正在使用 Spring AMQP 从 RabbitMq 异步发送和接收消息。
Spring AMQP 提供机制(创建监听器或使用注解@RabbitListner)从队列中读取消息。
问题是我可以让监听器从队列中读取消息。但是我如何将此消息发送到风暴集群上 运行 的 Storm Spout?
拓扑将启动一个集群,但是在我的 spout 的 nextTuple() 方法中,我需要从这个队列中读取消息。 Spring AMQP 可以用在这里吗?
我有一个侦听器配置为从队列中读取消息:
@RabbitListener(queues = "queueName")
public void processMessage(QueueMessage message) {
}
如何将监听器接收到的上述消息发送到我在集群上的 spout 运行。
或者,spout 的 nextTuple() 方法如何在其中包含此方法?可以吗
我在这里使用 Java 作为一种语言。
您可以使用 RabbitTemplate
receive
或 receiveAndConvert
方法之一按需阅读消息(而不是消息驱动)。
默认情况下,如果队列中没有消息,它们将 return 为 null。
编辑:
如果您设置 receiveTimeout
(在 1.5 或更高版本中可用),接收方法将在该时间阻塞(它在内部使用异步消费者并且不轮询)。
但它仍然不如监听器高效,因为每个方法都会创建一个新的消费者;要使用侦听器,您需要在 nextTuple()
(例如 BlockingQueue
)中使用一些内部阻塞机制来等待消息。
我想从 rabbitMq 队列中使用我的 Storm Spout 中的消息。
现在,我们正在使用 Spring AMQP 从 RabbitMq 异步发送和接收消息。
Spring AMQP 提供机制(创建监听器或使用注解@RabbitListner)从队列中读取消息。
问题是我可以让监听器从队列中读取消息。但是我如何将此消息发送到风暴集群上 运行 的 Storm Spout?
拓扑将启动一个集群,但是在我的 spout 的 nextTuple() 方法中,我需要从这个队列中读取消息。 Spring AMQP 可以用在这里吗?
我有一个侦听器配置为从队列中读取消息:
@RabbitListener(queues = "queueName")
public void processMessage(QueueMessage message) {
}
如何将监听器接收到的上述消息发送到我在集群上的 spout 运行。
或者,spout 的 nextTuple() 方法如何在其中包含此方法?可以吗
我在这里使用 Java 作为一种语言。
您可以使用 RabbitTemplate
receive
或 receiveAndConvert
方法之一按需阅读消息(而不是消息驱动)。
默认情况下,如果队列中没有消息,它们将 return 为 null。
编辑:
如果您设置 receiveTimeout
(在 1.5 或更高版本中可用),接收方法将在该时间阻塞(它在内部使用异步消费者并且不轮询)。
但它仍然不如监听器高效,因为每个方法都会创建一个新的消费者;要使用侦听器,您需要在 nextTuple()
(例如 BlockingQueue
)中使用一些内部阻塞机制来等待消息。