spring-rabbit 中的主题让消费者感到困惑
Consurrent consumers per topic in spring-rabbit
spring-rabbit 可以在一个主题上支持多个并发消费者吗?
详情如下
我的系统使用手动确认模式,通过 spring-rabbit (Spring 4.0.6) 进行主题交换。模式如下:
- 消息进入 ChannelAwareMessageListener
- 一个工厂方法生成一个合适的 worker 并传入对通道的引用
- 如果 worker 成功处理了消息,则消息被 Ack'd
- 如果 worker 不成功或发生异常,则消息被 Nack'd 并发送到死信队列以供稍后处理
由于这些工作人员中的一些可能需要相当多的时间来完成他们的 IO 绑定处理,我需要能够设置更多的并发消费者。
但是,经过一些测试后,我注意到有时多个消费者会收到相同的消息。果然,查看文档(http://docs.spring.io/spring-framework/docs/4.0.6.RELEASE/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setConcurrentConsumers-int-),证实了我的发现:
Do not raise the number of concurrent consumers for a topic, unless vendor-specific setup measures clearly allow for it. With regular setup, this would lead to concurrent consumption of the same message, which is hardly ever desirable.
我的问题如下:
- "unless vendor-specific setup measures clearly allow for it" 到底是什么意思?是否有 patch/version/configuration 或 Rabbit 支持此功能?
- 我可以轻松地在客户端上编写代码,以防止消息在另一个工作人员正在处理的情况下被处理。那么,我该如何处理这条消息呢?发送nack?忽略它?如果我 nack 然后实际处理消息的工作人员在一段时间后发送 ack 会发生什么?会抛出异常吗?
提前致谢...
您提到的警告是关于 JMS 而不是 RabbitMQ。看看 Spring RabbitMQ documentation。该文档不包含此警告。
一旦消息被传递到队列(无论交换类型如何),consumer/worker一次只能接收一次(假设没有问题).
如果您两次收到相同的消息,则说明某处存在问题:
- 消息被取消并重新排队
- channel/connection 在客户端关闭
- 存在网络问题,Rabbit 自动重新排队消息(channel/connection 在服务器端和客户端关闭)
对于最后两点,您应该会收到一些错误消息。
请注意,这一点在我看来是不必要的,并且可以解释问题:
- A factory method generates an appropriate worker and passes in a
reference to the channel
SimpleMessageListenerContainer
已经使用了 Executor
。当您使用自己的执行程序时,spring-amqp 通道池(如果您使用任何)和您的执行程序之间可能存在问题,例如。该频道已关闭,因为 spring-amqp 相信它不再被使用。
不要生成自己的线程,而是在当前 ChannelAwareMessageListener#onMessage
线程的同一线程上处理消息。
spring-rabbit 可以在一个主题上支持多个并发消费者吗?
详情如下
我的系统使用手动确认模式,通过 spring-rabbit (Spring 4.0.6) 进行主题交换。模式如下:
- 消息进入 ChannelAwareMessageListener
- 一个工厂方法生成一个合适的 worker 并传入对通道的引用
- 如果 worker 成功处理了消息,则消息被 Ack'd
- 如果 worker 不成功或发生异常,则消息被 Nack'd 并发送到死信队列以供稍后处理
由于这些工作人员中的一些可能需要相当多的时间来完成他们的 IO 绑定处理,我需要能够设置更多的并发消费者。
但是,经过一些测试后,我注意到有时多个消费者会收到相同的消息。果然,查看文档(http://docs.spring.io/spring-framework/docs/4.0.6.RELEASE/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setConcurrentConsumers-int-),证实了我的发现:
Do not raise the number of concurrent consumers for a topic, unless vendor-specific setup measures clearly allow for it. With regular setup, this would lead to concurrent consumption of the same message, which is hardly ever desirable.
我的问题如下:
- "unless vendor-specific setup measures clearly allow for it" 到底是什么意思?是否有 patch/version/configuration 或 Rabbit 支持此功能?
- 我可以轻松地在客户端上编写代码,以防止消息在另一个工作人员正在处理的情况下被处理。那么,我该如何处理这条消息呢?发送nack?忽略它?如果我 nack 然后实际处理消息的工作人员在一段时间后发送 ack 会发生什么?会抛出异常吗?
提前致谢...
您提到的警告是关于 JMS 而不是 RabbitMQ。看看 Spring RabbitMQ documentation。该文档不包含此警告。
一旦消息被传递到队列(无论交换类型如何),consumer/worker一次只能接收一次(假设没有问题).
如果您两次收到相同的消息,则说明某处存在问题:
- 消息被取消并重新排队
- channel/connection 在客户端关闭
- 存在网络问题,Rabbit 自动重新排队消息(channel/connection 在服务器端和客户端关闭)
对于最后两点,您应该会收到一些错误消息。
请注意,这一点在我看来是不必要的,并且可以解释问题:
- A factory method generates an appropriate worker and passes in a reference to the channel
SimpleMessageListenerContainer
已经使用了 Executor
。当您使用自己的执行程序时,spring-amqp 通道池(如果您使用任何)和您的执行程序之间可能存在问题,例如。该频道已关闭,因为 spring-amqp 相信它不再被使用。
不要生成自己的线程,而是在当前 ChannelAwareMessageListener#onMessage
线程的同一线程上处理消息。