Spring AMQP (Rabbit) 侦听器在出现异常时进入循环
Spring AMQP (Rabbit) Listener goes in a loop in case of exception
@Bean
RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
template.setMessageConverter(messageConverter);
template.setExchange(amqpProperties.getRabbitMqTopicExchangeName());
return template;
}
@Bean
@Conditional (OperationsCondition.class)
SimpleMessageListenerContainer opsMessageListenerContainer() {
return listenerContainer(amqpProperties.getRabbitMqOperationsQueue(),
amqpProperties.getInitialRabbitOperationsConsumerCount(),
amqpProperties.getMaximumRabbitOperationsConsumerCount(),
opsReceiver());
}
@Bean
@Conditional (OperationsCondition.class)
OperationsListener opsReceiver() {
return new OperationsListener();
}
private SimpleMessageListenerContainer listenerContainer(String queue,
int initConsumers,int maxConsumers, MessageListener listener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames(queue);
container.setMessageListener(listener);
container.setConcurrentConsumers(initConsumers);
container.setMaxConcurrentConsumers(maxConsumers);
container.setMessageConverter(messageConverter);
return container;
}
消息侦听器是:
public class OperationsListener implements MessageListener
{
public static final Logger logger = Logger.getInstance(OperationsListener.class);
@Autowired (required=true)
private OperationsProcessor processor;
@Autowired (required=true)
private ObjectMapper objectMapper;
public void onMessage(Message message)
{
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setJsonObjectMapper(objectMapper);
OperationsMessage request = (OperationsMessage)converter.fromMessage(message);
processor.createMessage(request);
//This is throwing a JPA database exception
processor.createOperation(request);
}
}
processor.createOperation() 由于数据库问题抛出异常。问题是消息侦听器正在循环并且消息不断返回。
我的处理器class:
@Component
@Transactional (propagation = Propagation.REQUIRES_NEW)
public class OperationsProcessor
{
...............
public void createOperation(OperationsMessage message)
{
try
{
.............
.............
//this call throws exception.
opsRepo.create(operation,null);
}
catch (Exception e)
{
logger.error(e);
}
}
}
opsRepo.create 抛出异常。尽管我发现了错误,但我希望 spring amqp 不会再次发送该消息。不确定为什么相同的消息不断返回。
编辑:
我想我找到了一些关于如何处理这个问题的建议。原因是 spring 在失败时请求事件,这是默认的性质。
找到了一个有用的线程 and here。
@Bean
RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
template.setMessageConverter(messageConverter);
template.setExchange(amqpProperties.getRabbitMqTopicExchangeName());
return template;
}
@Bean
@Conditional (OperationsCondition.class)
SimpleMessageListenerContainer opsMessageListenerContainer() {
return listenerContainer(amqpProperties.getRabbitMqOperationsQueue(),
amqpProperties.getInitialRabbitOperationsConsumerCount(),
amqpProperties.getMaximumRabbitOperationsConsumerCount(),
opsReceiver());
}
@Bean
@Conditional (OperationsCondition.class)
OperationsListener opsReceiver() {
return new OperationsListener();
}
private SimpleMessageListenerContainer listenerContainer(String queue,
int initConsumers,int maxConsumers, MessageListener listener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames(queue);
container.setMessageListener(listener);
container.setConcurrentConsumers(initConsumers);
container.setMaxConcurrentConsumers(maxConsumers);
container.setMessageConverter(messageConverter);
return container;
}
消息侦听器是:
public class OperationsListener implements MessageListener
{
public static final Logger logger = Logger.getInstance(OperationsListener.class);
@Autowired (required=true)
private OperationsProcessor processor;
@Autowired (required=true)
private ObjectMapper objectMapper;
public void onMessage(Message message)
{
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setJsonObjectMapper(objectMapper);
OperationsMessage request = (OperationsMessage)converter.fromMessage(message);
processor.createMessage(request);
//This is throwing a JPA database exception
processor.createOperation(request);
}
}
processor.createOperation() 由于数据库问题抛出异常。问题是消息侦听器正在循环并且消息不断返回。
我的处理器class:
@Component
@Transactional (propagation = Propagation.REQUIRES_NEW)
public class OperationsProcessor
{
...............
public void createOperation(OperationsMessage message)
{
try
{
.............
.............
//this call throws exception.
opsRepo.create(operation,null);
}
catch (Exception e)
{
logger.error(e);
}
}
}
opsRepo.create 抛出异常。尽管我发现了错误,但我希望 spring amqp 不会再次发送该消息。不确定为什么相同的消息不断返回。
编辑:
我想我找到了一些关于如何处理这个问题的建议。原因是 spring 在失败时请求事件,这是默认的性质。
找到了一个有用的线程