我怎样才能阻止我的 SimpleMessageListenerContainer 陷入 shutdown/restart 循环?
How can i stop my SimpleMessageListenerContainer from getting stuck in shutdown/restart loop?
我有一个与 RabbitMQ 一起使用的 SimpleMessageListenerContainer 和 Java。在大多数情况下,我没有遇到任何问题,但是在某些情况下,当消息发送到 queue 时,似乎出现异常导致 SMLC 进入循环尝试关闭然后重新启动 queue.
[10/03/15 17:09:38:161 UTC] 00000246 SimpleMessage W
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer
run Consumer raised exception, processing can restart if the
connection factory supports it. Exception summary:
org.springframework.amqp.AmqpIOException: java.io.IOException
[10/03/15 17:09:38:189 UTC] 00000246 SimpleMessage I
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer
run Restarting Consumer: tag=[null], channel=Cached Rabbit Channel:
AMQChannel(amqp://epa_devint1@xx.xx.xx.xx:5782/,1),
acknowledgeMode=AUTO local queue size=0
[10/03/15 17:09:39:164 UTC] 00000256 SimpleMessage W
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer
run Consumer raised exception, processing can restart if the
connection factory supports it. Exception summary:
com.rabbitmq.client.ShutdownSignalException: connection error; reason:
{#method(reply-code=541, reply-text=INTERNAL_ERROR,
class-id=0, method-id=0), null, ""}
当我通过管理界面从 queue 中删除消息时,不再有异常。
我认为异常的原因是缺少 header 属性.
处理此异常的正确方法是什么,以便从 queue 中删除消息并退出 shutdown/restart 逻辑?
public SimpleMessageListenerContainer createMessageListenerContainer(Object consumer, String exchangeName, String queueName, String routingKey) {
TopicExchange exchange = new TopicExchange(exchangeName);
Queue queue = new Queue(queueName,
MessagingConstants.RABBIT_MQ_QUEUE_DURABLE,
MessagingConstants.RABBIT_MQ_QUEUE_EXCLUSIVE,
MessagingConstants.RABBIT_MQ_QUEUE_AUTO_DELETE,
MessagingConstants.RABBIT_MQ_QUEUE_ARGUMENTS);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setConcurrentConsumers(MessagingConstants.RABBIT_MQ_CONCURRENT_CONSUMERS);
container.setErrorHandler(errorHandler);
container.setMessageListener(new MessageListenerAdapter(consumer, null));
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setAdviceChain(retryAdviceChainFactory.createRequestRequeueExceptionAwareRetryChain(MessagingConstants.RABBIT_MQ_RETRY_ATTEMPTS));
container.setChannelTransacted(true);
container.setTaskExecutor(taskExecutor);
container.setTransactionManager(transactionManager);
return container;
}
@Override
public void afterPropertiesSet() {
com.rabbitmq.client.ConnectionFactory rabbitFactory = new com.rabbitmq.client.ConnectionFactory() {
protected void configureSocket(Socket socket) throws IOException {
super.configureSocket(socket);
socket.setSoTimeout(propertiesHolder.getRabbitMQSocketTimeoutMS());
}
};
rabbitFactory.setConnectionTimeout(propertiesHolder.getRabbitMQConnectionTimeoutMS());
rabbitFactory.setRequestedHeartbeat(propertiesHolder.getRabbitMQRequestedHeartbeatInSeconds());
CachingConnectionFactory cachingFactory = new CachingConnectionFactory(rabbitFactory);
cachingFactory.setAddresses(propertiesHolder.getRabbitMQHost());
cachingFactory.setPort(propertiesHolder.getRabbitMQPort());
cachingFactory.setUsername(propertiesHolder.getRabbitMQUsername());
cachingFactory.setPassword(propertiesHolder.getRabbitMQPassword());
cachingFactory.setChannelCacheSize(propertiesHolder.getRabbitMQChannelCacheSize());
connectionFactory = cachingFactory;
retryAdviceChainFactory = new RetryAdviceChainFactory();
amqpAdmin = new RabbitAdmin(connectionFactory);
errorHandler = new ErrorHandler() {
@Override
public void handleError(Throwable e) {
LOG.error("Error occurred", e);
}
};
TopicExchange deadLetterExchange = new TopicExchange(MessagingConstants.RABBIT_MQ_DEAD_LETTER_EXCHANGE);
Queue deadLetterQueue = new Queue(
MessagingConstants.RABBIT_MQ_DEAD_LETTER_QUEUE,
MessagingConstants.RABBIT_MQ_QUEUE_DURABLE,
MessagingConstants.RABBIT_MQ_QUEUE_EXCLUSIVE,
MessagingConstants.RABBIT_MQ_QUEUE_AUTO_DELETE,
MessagingConstants.RABBIT_MQ_QUEUE_ARGUMENTS);
amqpAdmin.declareExchange(deadLetterExchange);
amqpAdmin.declareQueue(deadLetterQueue);
amqpAdmin.declareBinding(BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("#"));
messageRecoverer = new DeadLetterMessageRecoverer(rabbitTemplate(), deadLetterExchange);
}
JDK: 1.6
spring-rabbit: 1.1.3.Release
spring-framework: 3.2.1.Release
您的问题是由消息处理的事务性质引起的。我的意思是:
container.setTransactionManager(transactionManager);
发生的情况是,在某些情况下(可能是消息 header 的一些问题,正如您所建议的)您遇到了一些错误,并且没有得到正确处理。错误传播并到达 txManager,后者依次:
- 不确认消息,return将消息发送给queue
- 认为您当前的连接已损坏,并关闭它,以便它可以打开一个全新的连接
现在,当您遇到的问题不是您的消息的直接后果时,上述行为绝对有意义。假设您在处理消息时遇到超时;应用程序在处理它时正在关闭;在所有这些情况下,稍后重新传送消息或将消息传送到另一个节点确实有意义。
然而,在您的情况下,什么时候和哪个节点收到错误消息无关紧要,您将遇到同样的问题,您需要手动删除该消息。为了避免这种情况你可以:
以编程方式过滤掉无效消息,并且不通过
将它们return发送到queue
- 在处理之前进行检查(防止发生致命错误)
- 以不同的方式处理此类致命错误(正确的错误处理)
根据您的要求使用 RabbitMQ 功能来处理错误
- 死信Queue
- 经纪人端验证
- 可能 TTL 也可以
我有一个与 RabbitMQ 一起使用的 SimpleMessageListenerContainer 和 Java。在大多数情况下,我没有遇到任何问题,但是在某些情况下,当消息发送到 queue 时,似乎出现异常导致 SMLC 进入循环尝试关闭然后重新启动 queue.
[10/03/15 17:09:38:161 UTC] 00000246 SimpleMessage W org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer run Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpIOException: java.io.IOException
[10/03/15 17:09:38:189 UTC] 00000246 SimpleMessage I org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer run Restarting Consumer: tag=[null], channel=Cached Rabbit Channel: AMQChannel(amqp://epa_devint1@xx.xx.xx.xx:5782/,1), acknowledgeMode=AUTO local queue size=0
[10/03/15 17:09:39:164 UTC] 00000256 SimpleMessage W org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer run Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}
当我通过管理界面从 queue 中删除消息时,不再有异常。
我认为异常的原因是缺少 header 属性.
处理此异常的正确方法是什么,以便从 queue 中删除消息并退出 shutdown/restart 逻辑?
public SimpleMessageListenerContainer createMessageListenerContainer(Object consumer, String exchangeName, String queueName, String routingKey) {
TopicExchange exchange = new TopicExchange(exchangeName);
Queue queue = new Queue(queueName,
MessagingConstants.RABBIT_MQ_QUEUE_DURABLE,
MessagingConstants.RABBIT_MQ_QUEUE_EXCLUSIVE,
MessagingConstants.RABBIT_MQ_QUEUE_AUTO_DELETE,
MessagingConstants.RABBIT_MQ_QUEUE_ARGUMENTS);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setConcurrentConsumers(MessagingConstants.RABBIT_MQ_CONCURRENT_CONSUMERS);
container.setErrorHandler(errorHandler);
container.setMessageListener(new MessageListenerAdapter(consumer, null));
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setAdviceChain(retryAdviceChainFactory.createRequestRequeueExceptionAwareRetryChain(MessagingConstants.RABBIT_MQ_RETRY_ATTEMPTS));
container.setChannelTransacted(true);
container.setTaskExecutor(taskExecutor);
container.setTransactionManager(transactionManager);
return container;
}
@Override
public void afterPropertiesSet() {
com.rabbitmq.client.ConnectionFactory rabbitFactory = new com.rabbitmq.client.ConnectionFactory() {
protected void configureSocket(Socket socket) throws IOException {
super.configureSocket(socket);
socket.setSoTimeout(propertiesHolder.getRabbitMQSocketTimeoutMS());
}
};
rabbitFactory.setConnectionTimeout(propertiesHolder.getRabbitMQConnectionTimeoutMS());
rabbitFactory.setRequestedHeartbeat(propertiesHolder.getRabbitMQRequestedHeartbeatInSeconds());
CachingConnectionFactory cachingFactory = new CachingConnectionFactory(rabbitFactory);
cachingFactory.setAddresses(propertiesHolder.getRabbitMQHost());
cachingFactory.setPort(propertiesHolder.getRabbitMQPort());
cachingFactory.setUsername(propertiesHolder.getRabbitMQUsername());
cachingFactory.setPassword(propertiesHolder.getRabbitMQPassword());
cachingFactory.setChannelCacheSize(propertiesHolder.getRabbitMQChannelCacheSize());
connectionFactory = cachingFactory;
retryAdviceChainFactory = new RetryAdviceChainFactory();
amqpAdmin = new RabbitAdmin(connectionFactory);
errorHandler = new ErrorHandler() {
@Override
public void handleError(Throwable e) {
LOG.error("Error occurred", e);
}
};
TopicExchange deadLetterExchange = new TopicExchange(MessagingConstants.RABBIT_MQ_DEAD_LETTER_EXCHANGE);
Queue deadLetterQueue = new Queue(
MessagingConstants.RABBIT_MQ_DEAD_LETTER_QUEUE,
MessagingConstants.RABBIT_MQ_QUEUE_DURABLE,
MessagingConstants.RABBIT_MQ_QUEUE_EXCLUSIVE,
MessagingConstants.RABBIT_MQ_QUEUE_AUTO_DELETE,
MessagingConstants.RABBIT_MQ_QUEUE_ARGUMENTS);
amqpAdmin.declareExchange(deadLetterExchange);
amqpAdmin.declareQueue(deadLetterQueue);
amqpAdmin.declareBinding(BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("#"));
messageRecoverer = new DeadLetterMessageRecoverer(rabbitTemplate(), deadLetterExchange);
}
JDK: 1.6 spring-rabbit: 1.1.3.Release spring-framework: 3.2.1.Release
您的问题是由消息处理的事务性质引起的。我的意思是:
container.setTransactionManager(transactionManager);
发生的情况是,在某些情况下(可能是消息 header 的一些问题,正如您所建议的)您遇到了一些错误,并且没有得到正确处理。错误传播并到达 txManager,后者依次:
- 不确认消息,return将消息发送给queue
- 认为您当前的连接已损坏,并关闭它,以便它可以打开一个全新的连接
现在,当您遇到的问题不是您的消息的直接后果时,上述行为绝对有意义。假设您在处理消息时遇到超时;应用程序在处理它时正在关闭;在所有这些情况下,稍后重新传送消息或将消息传送到另一个节点确实有意义。
然而,在您的情况下,什么时候和哪个节点收到错误消息无关紧要,您将遇到同样的问题,您需要手动删除该消息。为了避免这种情况你可以:
以编程方式过滤掉无效消息,并且不通过
将它们return发送到queue- 在处理之前进行检查(防止发生致命错误)
- 以不同的方式处理此类致命错误(正确的错误处理)
根据您的要求使用 RabbitMQ 功能来处理错误
- 死信Queue
- 经纪人端验证
- 可能 TTL 也可以