Rabbitmq + Spring AMQP:重新发送消息但没有确认 return
Rabbitmq + Spring AMQP: resend msg but no ack return
我使用spring amqp发送msg,并在ack=false时添加重发逻辑,使用相同的rabbitTemplate
@Bean
public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
RabbitMetaMessage metaMessage = (RabbitMetaMessage) mqMsgUtil.getMetaMsg(cacheKey);
//1 receive ack
if (ack) {
// send success
mqMsgUtil.setMsgSuccess(cacheKey);
SUCESS_SEND = true;
// send failed
} else {
reSendMsg(cacheKey, metaMessage, rabbitTemplate);
}
});
public void reSendMsg(String msgId, RabbitMetaMessage rabbitMetaMessage,RabbitTemplate rabbitTemplate) {
rabbitTemplate.convertAndSend(rabbitMetaMessage)
.....
}
我第一次发送消息时可以“1 receive ack”,但是当在seSendMsg 中再次使用RabbitTemplate 发送消息时,我无法再次接收到ack。这是怎么回事?
不允许在ack回调上发送消息;它会导致客户端库出现死锁。您需要在不同的线程上重新发送。
在下一个版本 (2.1) 中,我们更改了代码以在不同的线程上调用回调,以避免用户代码必须这样做。参见 AMQP-817 and its linked 。
我使用spring amqp发送msg,并在ack=false时添加重发逻辑,使用相同的rabbitTemplate
@Bean
public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
RabbitMetaMessage metaMessage = (RabbitMetaMessage) mqMsgUtil.getMetaMsg(cacheKey);
//1 receive ack
if (ack) {
// send success
mqMsgUtil.setMsgSuccess(cacheKey);
SUCESS_SEND = true;
// send failed
} else {
reSendMsg(cacheKey, metaMessage, rabbitTemplate);
}
});
public void reSendMsg(String msgId, RabbitMetaMessage rabbitMetaMessage,RabbitTemplate rabbitTemplate) {
rabbitTemplate.convertAndSend(rabbitMetaMessage)
.....
}
我第一次发送消息时可以“1 receive ack”,但是当在seSendMsg 中再次使用RabbitTemplate 发送消息时,我无法再次接收到ack。这是怎么回事?
不允许在ack回调上发送消息;它会导致客户端库出现死锁。您需要在不同的线程上重新发送。
在下一个版本 (2.1) 中,我们更改了代码以在不同的线程上调用回调,以避免用户代码必须这样做。参见 AMQP-817 and its linked