当启用发布者确认时,设置了队列长度限制并且溢出设置为拒绝发布,为什么我收到的确认回调为空?
When publisher confirms are enabled, queue length limit is set and overflow is set to reject-publish,why cause in confirm callback I received is null?
我正在学习队列长度限制(https://www.rabbitmq.com/maxlength.html),正如它所说,队列设置为'x-max-length:10'
和'x-overflow:reject-publish'
,而且,我启用publisher confirms
.因此,当队列中的消息数达到 10 时,将通过 basic.nack
消息通知发布者拒绝。
它是:我的确认回调得到了一个错误的确认,但是 cause
是 null
。我想知道不应该 return 一些东西以便我可以区分这种情况。部分代码如下:
@Bean
public AmqpTemplate amqpTemplate(@Autowired CachingConnectionFactory amqpConnectionFactory) {
amqpConnectionFactory.setPublisherReturns(true);
amqpConnectionFactory.setPublisherConfirms(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(amqpConnectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
return rabbitTemplate;
}
static RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(ack); // when number of messages reach 10, print false
System.out.println(cause); // when number of messages reach 10, print null
}
};
@Bean
public Queue queue() {
return QueueBuilder.durable(DURABLE_QUEUE).withArgument("x-max-length", 10).withArgument("x-overflow", "reject-publish").build();
}
@Scheduled(fixedDelay = 1000L)
public void produce() {
Message msg = new Message(UUID.randomUUID().toString(), "sth");
amqpTemplate.convertAndSend("sth", "sth", msg );
}
不幸的是,AMQP 协议和 Java 客户端没有提供有关发布失败原因的信息。仅ack/nack及是否为多条消息确认:
/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker. Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {
void handleAck(long deliveryTag, boolean multiple)
throws IOException;
void handleNack(long deliveryTag, boolean multiple)
throws IOException;
}
我们添加了 cause
,因为在某些情况下,框架会合成一个 nack(例如,当我们在等待确认时关闭通道时,我们添加 Channel closed by application
作为 cause
.
框架无法推测我们从代理那里得到 nack 的原因。
Spring RabbitMQ 确实提供了有关发布失败原因的信息。将 spring.rabbitmq.publisher-returns
设置为 true
。
我正在学习队列长度限制(https://www.rabbitmq.com/maxlength.html),正如它所说,队列设置为'x-max-length:10'
和'x-overflow:reject-publish'
,而且,我启用publisher confirms
.因此,当队列中的消息数达到 10 时,将通过 basic.nack
消息通知发布者拒绝。
它是:我的确认回调得到了一个错误的确认,但是 cause
是 null
。我想知道不应该 return 一些东西以便我可以区分这种情况。部分代码如下:
@Bean
public AmqpTemplate amqpTemplate(@Autowired CachingConnectionFactory amqpConnectionFactory) {
amqpConnectionFactory.setPublisherReturns(true);
amqpConnectionFactory.setPublisherConfirms(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(amqpConnectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
return rabbitTemplate;
}
static RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(ack); // when number of messages reach 10, print false
System.out.println(cause); // when number of messages reach 10, print null
}
};
@Bean
public Queue queue() {
return QueueBuilder.durable(DURABLE_QUEUE).withArgument("x-max-length", 10).withArgument("x-overflow", "reject-publish").build();
}
@Scheduled(fixedDelay = 1000L)
public void produce() {
Message msg = new Message(UUID.randomUUID().toString(), "sth");
amqpTemplate.convertAndSend("sth", "sth", msg );
}
不幸的是,AMQP 协议和 Java 客户端没有提供有关发布失败原因的信息。仅ack/nack及是否为多条消息确认:
/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker. Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {
void handleAck(long deliveryTag, boolean multiple)
throws IOException;
void handleNack(long deliveryTag, boolean multiple)
throws IOException;
}
我们添加了 cause
,因为在某些情况下,框架会合成一个 nack(例如,当我们在等待确认时关闭通道时,我们添加 Channel closed by application
作为 cause
.
框架无法推测我们从代理那里得到 nack 的原因。
Spring RabbitMQ 确实提供了有关发布失败原因的信息。将 spring.rabbitmq.publisher-returns
设置为 true
。