RabbitMQ 通道空闲问题 |如何恢复未确认的 AMQP 消息 | Java客户端消费者
RabbitMQ channels idling issue | How to recover unacked AMQP messages | Javaclient consumer
我是 rabbitmq 的消费者并使用 spring-amqp。现在,当我进入管理员界面时,所有连接都显示 运行 但其中的通道都处于空闲状态 (Prefetch:250, Unacked:250) 。能否请你帮忙?
如何正确使用这个预取?
我需要关闭连接吗?
如何增加每个连接的通道数。现在每个连接只有一个通道。以下是代码配置片段。我正在使用 out if box spring amqp 配置用于大多数事情。我还使用自定义 rabbitmq 消息侦听器来确认或取消消息。
<!-- RabbitMQ configuration -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhosts}" requested-heartbeat="${rabbitmq.requestedHeartBeat}"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="rabbitJsonConverter"/>
channel.Close
<bean id="rabbitJsonConverter" class="rabbitmq.messages.converter.CustomJackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="rabbitmq.messages.custom.dto.CustomRabbitMQMessage"/>
</bean>
</property>
</bean>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" requeue-rejected="true">
<rabbit:listener queue-names="${rabbitmq.queuename}" ref="customRabbitMQMessageListener" method="onMessage"/>
</rabbit:listener-container>
<bean id="customRabbitMQMessageListener" class="rabbitmq.messages.listener.CustomRabbitMQMessageListener" >
<property name="customerAccountService" ref="customerAccountService" />
</bean>
**Listener Code**
LOG.debug("***** LISTENING RABBITMQ MESSAGES START******");
channel.basicRecover(true);
try {
boolean ack = performOperationsOnMessage(msg);
if (ack) {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} else
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
LOG.debug("***** LISTENING RABBITMQ MESSAGES FINISHED******");
} catch (Exception exp) {
LOG.error("Exception occured during perform Change operation, RabbitMQ message: " + exp.getMessage(), exp);
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
}
private boolean performOperationsOnMessage(Message msg) {
RabbitMQMessage message = null;
try {
message = (RabbitMQMessage) rabbitJsonConverter.fromMessage(msg);
} catch (MessageConversionException exp) {
LOG.warn("Exception occurred during the conversion or any other issue", exp);
return true;
}
if (message == null || message.getOperation() == null || message.getResource() == null || message.getResource().getUuid() == null) {
LOG.warn("Received an empty message or emptry operation or empty resource or empty uuid from queue ");
return true;
}
if (message.getOperation().equals(RabbitMQMessage.RossoOperation.remove.name())) {
return performRemoveOperation(message);
}
if (message.getOperation().equals(RabbitMQMessage.RossoOperation.change.name())) {
return performChangeOperation(message);
}
return true;
}
所以这个问题的解决方案是在配置为手动确认的侦听器代码中。逻辑中有一些分支使侦听器无法确认某些消息,这就是通道上的未确认计数达到预取 (250) 的方式,导致 RabbitMQ 停止向通道发送消息。
修复:正如您在问题中看到的更新后的侦听器代码,它永远不会留下任何未确认的消息。
同样在否定确认 channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true) 中,requeue(签名中的最后一个变量)应该为 true,以便消息可以重新排队回到同一个队列
<!-- RabbitMQ configuration -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhosts}" requested-heartbeat="${rabbitmq.requestedHeartBeat}"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="rabbitJsonConverter"/>
channel.Close
<bean id="rabbitJsonConverter" class="rabbitmq.messages.converter.CustomJackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="rabbitmq.messages.custom.dto.CustomRabbitMQMessage"/>
</bean>
</property>
</bean>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" requeue-rejected="true">
<rabbit:listener queue-names="${rabbitmq.queuename}" ref="customRabbitMQMessageListener" method="onMessage"/>
</rabbit:listener-container>
<bean id="customRabbitMQMessageListener" class="rabbitmq.messages.listener.CustomRabbitMQMessageListener" >
<property name="customerAccountService" ref="customerAccountService" />
</bean>
**Listener Code**
LOG.debug("***** LISTENING RABBITMQ MESSAGES START******");
channel.basicRecover(true);
try {
boolean ack = performOperationsOnMessage(msg);
if (ack) {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} else
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
LOG.debug("***** LISTENING RABBITMQ MESSAGES FINISHED******");
} catch (Exception exp) {
LOG.error("Exception occured during perform Change operation, RabbitMQ message: " + exp.getMessage(), exp);
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
}
private boolean performOperationsOnMessage(Message msg) {
RabbitMQMessage message = null;
try {
message = (RabbitMQMessage) rabbitJsonConverter.fromMessage(msg);
} catch (MessageConversionException exp) {
LOG.warn("Exception occurred during the conversion or any other issue", exp);
return true;
}
if (message == null || message.getOperation() == null || message.getResource() == null || message.getResource().getUuid() == null) {
LOG.warn("Received an empty message or emptry operation or empty resource or empty uuid from queue ");
return true;
}
if (message.getOperation().equals(RabbitMQMessage.RossoOperation.remove.name())) {
return performRemoveOperation(message);
}
if (message.getOperation().equals(RabbitMQMessage.RossoOperation.change.name())) {
return performChangeOperation(message);
}
return true;
}
所以这个问题的解决方案是在配置为手动确认的侦听器代码中。逻辑中有一些分支使侦听器无法确认某些消息,这就是通道上的未确认计数达到预取 (250) 的方式,导致 RabbitMQ 停止向通道发送消息。
修复:正如您在问题中看到的更新后的侦听器代码,它永远不会留下任何未确认的消息。 同样在否定确认 channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true) 中,requeue(签名中的最后一个变量)应该为 true,以便消息可以重新排队回到同一个队列