具有 Spring 的 RabbitMQ 永远重新传递消息
RabbitMQ with Spring redelivering message forever
我将 Spring 与 RabbitMQ 一起使用,我试图避免在发生运行时异常时重新传递消息。我试图在 listener-container
中将 requeue-reject
设置为 false
并配置一个自定义错误处理程序来抛出 AmqpRejectAndDontRequeueException
。似乎这两种策略都失败了,消息会永远重新传递。有什么想法吗?
感谢您的帮助。
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd">
<rabbit:connection-factory id="rabbitMQConnectionFactory" host="localhost" port="5672" username="guest" password="guest" />
<rabbit:admin connection-factory="rabbitMQConnectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="rabbitMQConnectionFactory" />
<rabbit:queue name="q1" />
<rabbit:queue name="q2" />
<rabbit:listener-container error-handler="errorHandler" connection-factory="rabbitMQConnectionFactory" concurrency="10" transaction-manager="transactionManager" requeue-rejected="false">
<rabbit:listener ref="q1Listener" method="consumeMessage" queue-names="q1" />
<rabbit:listener ref="q2Listener" method="consumeMessage" queue-names="q2" />
</rabbit:listener-container>
<bean id="errorHandler" class="ErrorHandler" />
<bean id="q1Listener" class="Q1MessageConsumerBean" />
<bean id="q2Listener" class="Q2MessageConsumerBean" />
</beans>
首先,requeue-rejected="false"
和 AmqpRejectAndDontRequeueException
对来自目标侦听器的 RuntimeException
执行相同的操作。
这是一个测试用例,表明 requeue-rejected="false"
运行良好。
<rabbit:connection-factory id="connectionFactory" host="localhost" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="foo" routing-key="foo" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="foo" />
<rabbit:direct-exchange name="foo">
<rabbit:bindings>
<rabbit:binding queue="foo" key="foo" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container connection-factory="connectionFactory" auto-startup="false" requeue-rejected="false">
<rabbit:listener ref="listener" queue-names="foo" />
</rabbit:listener-container>
<bean id="listener" class="org.mockito.Mockito" factory-method="spy">
<constructor-arg>
<bean class="org.springframework.amqp.rabbit.listener.RejectedTests$ThrowListener" />
</constructor-arg>
</bean>
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private SimpleMessageListenerContainer container;
@Autowired
private ThrowListener throwListener;
@Test
public void test() throws Exception {
rabbitTemplate.convertAndSend("foo");
container.start();
Thread.sleep(2000);
Mockito.verify(throwListener).onMessage(Mockito.any(Message.class));
}
public static class ThrowListener implements MessageListener {
@Override
public void onMessage(Message message) {
throw new RuntimeException("intentional reject");
}
}
Mockito.verify(throwListener).onMessage(Mockito.any(Message.class));
确认 onMessage
仅在首次交付时被调用一次。第二次传递没有发生,因为我们的消息被 RabbitMQ Broker 拒绝并丢弃。
我只看到一个地方,它不是独立于 requeue-rejected="false"
- RabbitResourceHolder#rollbackAll()
:
for (Long deliveryTag : deliveryTags.get(channel)) {
try {
channel.basicReject(deliveryTag, true);
} catch (IOException ex) {
throw new AmqpIOException(ex);
}
}
然而,只有在 TX 提交和该提交的 Expception 上才能到达此块。因此,在这种情况下我们会导致 TX 回滚:
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {
Channel channel = consumer.getChannel();
for (int i = 0; i < txSize; i++) {
logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(receiveTimeout);
if (message == null) {
break;
}
try {
executeListener(channel, message);
}
catch (ImmediateAcknowledgeAmqpException e) {
break;
}
catch (Throwable ex) {
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));
注意最后一行。 consumer.rollbackOnExceptionIfNecessary(ex);
是从侦听器抛出异常的情况。否则我们到达最后一行并等待外部 TX 提交。
如果您遇到这种情况,请告诉我们。
非常感谢您的回答。我做了一些其他的睾丸,现在我有了相同配置的预期行为(删除了 errorHandle)。
FYK,在我的一项测试中,我检测到发生重新投递的情况,这可能与您所说的有关:
流:Listener
-> Facade
-> Service
。所有交易。
如果 Service
抛出 RuntimeException
并且我陷入 Facade
并且没有重新抛出(吞下异常),则消息将重新传递。看起来 tx 被回滚,即使我吞下了异常并且消息被重新传递 - 忽略 requeue-rejected
属性.
再次感谢。
我将 Spring 与 RabbitMQ 一起使用,我试图避免在发生运行时异常时重新传递消息。我试图在 listener-container
中将 requeue-reject
设置为 false
并配置一个自定义错误处理程序来抛出 AmqpRejectAndDontRequeueException
。似乎这两种策略都失败了,消息会永远重新传递。有什么想法吗?
感谢您的帮助。
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd">
<rabbit:connection-factory id="rabbitMQConnectionFactory" host="localhost" port="5672" username="guest" password="guest" />
<rabbit:admin connection-factory="rabbitMQConnectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="rabbitMQConnectionFactory" />
<rabbit:queue name="q1" />
<rabbit:queue name="q2" />
<rabbit:listener-container error-handler="errorHandler" connection-factory="rabbitMQConnectionFactory" concurrency="10" transaction-manager="transactionManager" requeue-rejected="false">
<rabbit:listener ref="q1Listener" method="consumeMessage" queue-names="q1" />
<rabbit:listener ref="q2Listener" method="consumeMessage" queue-names="q2" />
</rabbit:listener-container>
<bean id="errorHandler" class="ErrorHandler" />
<bean id="q1Listener" class="Q1MessageConsumerBean" />
<bean id="q2Listener" class="Q2MessageConsumerBean" />
</beans>
首先,requeue-rejected="false"
和 AmqpRejectAndDontRequeueException
对来自目标侦听器的 RuntimeException
执行相同的操作。
这是一个测试用例,表明 requeue-rejected="false"
运行良好。
<rabbit:connection-factory id="connectionFactory" host="localhost" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="foo" routing-key="foo" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="foo" />
<rabbit:direct-exchange name="foo">
<rabbit:bindings>
<rabbit:binding queue="foo" key="foo" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container connection-factory="connectionFactory" auto-startup="false" requeue-rejected="false">
<rabbit:listener ref="listener" queue-names="foo" />
</rabbit:listener-container>
<bean id="listener" class="org.mockito.Mockito" factory-method="spy">
<constructor-arg>
<bean class="org.springframework.amqp.rabbit.listener.RejectedTests$ThrowListener" />
</constructor-arg>
</bean>
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private SimpleMessageListenerContainer container;
@Autowired
private ThrowListener throwListener;
@Test
public void test() throws Exception {
rabbitTemplate.convertAndSend("foo");
container.start();
Thread.sleep(2000);
Mockito.verify(throwListener).onMessage(Mockito.any(Message.class));
}
public static class ThrowListener implements MessageListener {
@Override
public void onMessage(Message message) {
throw new RuntimeException("intentional reject");
}
}
Mockito.verify(throwListener).onMessage(Mockito.any(Message.class));
确认 onMessage
仅在首次交付时被调用一次。第二次传递没有发生,因为我们的消息被 RabbitMQ Broker 拒绝并丢弃。
我只看到一个地方,它不是独立于 requeue-rejected="false"
- RabbitResourceHolder#rollbackAll()
:
for (Long deliveryTag : deliveryTags.get(channel)) {
try {
channel.basicReject(deliveryTag, true);
} catch (IOException ex) {
throw new AmqpIOException(ex);
}
}
然而,只有在 TX 提交和该提交的 Expception 上才能到达此块。因此,在这种情况下我们会导致 TX 回滚:
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {
Channel channel = consumer.getChannel();
for (int i = 0; i < txSize; i++) {
logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(receiveTimeout);
if (message == null) {
break;
}
try {
executeListener(channel, message);
}
catch (ImmediateAcknowledgeAmqpException e) {
break;
}
catch (Throwable ex) {
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));
注意最后一行。 consumer.rollbackOnExceptionIfNecessary(ex);
是从侦听器抛出异常的情况。否则我们到达最后一行并等待外部 TX 提交。
如果您遇到这种情况,请告诉我们。
非常感谢您的回答。我做了一些其他的睾丸,现在我有了相同配置的预期行为(删除了 errorHandle)。
FYK,在我的一项测试中,我检测到发生重新投递的情况,这可能与您所说的有关:
流:Listener
-> Facade
-> Service
。所有交易。
如果 Service
抛出 RuntimeException
并且我陷入 Facade
并且没有重新抛出(吞下异常),则消息将重新传递。看起来 tx 被回滚,即使我吞下了异常并且消息被重新传递 - 忽略 requeue-rejected
属性.
再次感谢。