为什么 Spring AMQP 消息从已填充的队列中调度有延迟?
Why is there a delay in Spring AMQP Message dispatching from a filled Queue?
我在消息驱动的应用程序中使用 Spring AMQP。我注意到在我的消息侦听器的调用之间几乎有大约 300 毫秒的持续延迟,即使我确定队列中充满了消息。
下面的日志文件显示了 BlockingQueueConsumer.nextMessage
和 BlockingQueueConsumer.handle
之间的延迟,其中另一个线程调用了 BlockingQueueConsumer.handleDelivery
:
2015-05-12 12:46:18,655 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,655 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [pool-1-thread-6 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@18dc305(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:16:06 CEST 2015, messageId=143134227498011576, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=8, messageCount=0])
2015-05-12 12:46:18,967 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:18,967 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:18,967 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [pool-1-thread-7 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1aaa7d8(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:17:08 CEST 2015, messageId=143134227498011584, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=9, messageCount=0])
2015-05-12 12:46:19,280 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,280 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:19,280 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [pool-1-thread-3 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1c893d2(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:18:07 CEST 2015, messageId=143134227498011592, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=10, messageCount=0])
2015-05-12 12:46:19,577 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,577 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
日志文件显示当队列中肯定充满消息时的消息处理。我的 Spring 配置文件的相关部分如下所示:
<rabbit:connection-factory id="amqpConnectionFactory" connection-factory="clientConnectionFactory"
host="${amqp.broker.ip}"
port="${amqp.broker.port}"
virtual-host="${amqp.broker.vhost}"
username="${amqp.user}"
password="${amqp.password}"/>
<bean id="clientConnectionFactory" class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:server.ini"/>
</bean>
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="amqpConnectionFactory" />
<property name="messageConverter" ref="marshallingMessageConverter"/>
</bean>
<bean id="marshallingMessageConverter" class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="jaxbMarshaller" />
</bean>
<oxm:jaxb2-marshaller id="jaxbMarshaller" context-path="com.my.package"/>
<rabbit:listener-container id="heartbeatListenerContainer" connection-factory="amqpConnectionFactory" auto-startup="false">
<rabbit:listener ref="queueMessageHandler" queue-names="heartbeat-bdwh" />
</rabbit:listener-container>
<bean id="queueMessageHandler" class="com.my.package.QueueMessageHandler"/>
我正在努力寻找延迟的原因。据我了解,它源自 Spring BlockingQueueConsumer。我不确定发生了什么以及为什么从另一个线程调用 BlockingQueueConsumer.handleDelivery 方法。
非常感谢任何帮助!
可能是网络问题?
默认配置一次处理 1 条消息,在发送 ack 之前代理不会发送下一条消息。
尝试增加侦听器容器上的 prefetch
,以便在消费者线程准备就绪时容器始终有可用的消息。
查看网络跟踪(Wireshark 或类似工具)。
编辑:
如果您的网络状况不佳,并且可以忍受重复投递的可能性增加,您也可以考虑增加 txSize
这样就不会为每条消息都发送 ack。不过,请务必将其设置为小于 prefetch
的值。
我在消息驱动的应用程序中使用 Spring AMQP。我注意到在我的消息侦听器的调用之间几乎有大约 300 毫秒的持续延迟,即使我确定队列中充满了消息。
下面的日志文件显示了 BlockingQueueConsumer.nextMessage
和 BlockingQueueConsumer.handle
之间的延迟,其中另一个线程调用了 BlockingQueueConsumer.handleDelivery
:
2015-05-12 12:46:18,655 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,655 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [pool-1-thread-6 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@18dc305(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:16:06 CEST 2015, messageId=143134227498011576, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=8, messageCount=0])
2015-05-12 12:46:18,967 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:18,967 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:18,967 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [pool-1-thread-7 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1aaa7d8(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:17:08 CEST 2015, messageId=143134227498011584, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=9, messageCount=0])
2015-05-12 12:46:19,280 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,280 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:19,280 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [pool-1-thread-3 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1c893d2(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:18:07 CEST 2015, messageId=143134227498011592, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=10, messageCount=0])
2015-05-12 12:46:19,577 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,577 INFO [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
日志文件显示当队列中肯定充满消息时的消息处理。我的 Spring 配置文件的相关部分如下所示:
<rabbit:connection-factory id="amqpConnectionFactory" connection-factory="clientConnectionFactory"
host="${amqp.broker.ip}"
port="${amqp.broker.port}"
virtual-host="${amqp.broker.vhost}"
username="${amqp.user}"
password="${amqp.password}"/>
<bean id="clientConnectionFactory" class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:server.ini"/>
</bean>
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="amqpConnectionFactory" />
<property name="messageConverter" ref="marshallingMessageConverter"/>
</bean>
<bean id="marshallingMessageConverter" class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="jaxbMarshaller" />
</bean>
<oxm:jaxb2-marshaller id="jaxbMarshaller" context-path="com.my.package"/>
<rabbit:listener-container id="heartbeatListenerContainer" connection-factory="amqpConnectionFactory" auto-startup="false">
<rabbit:listener ref="queueMessageHandler" queue-names="heartbeat-bdwh" />
</rabbit:listener-container>
<bean id="queueMessageHandler" class="com.my.package.QueueMessageHandler"/>
我正在努力寻找延迟的原因。据我了解,它源自 Spring BlockingQueueConsumer。我不确定发生了什么以及为什么从另一个线程调用 BlockingQueueConsumer.handleDelivery 方法。
非常感谢任何帮助!
可能是网络问题?
默认配置一次处理 1 条消息,在发送 ack 之前代理不会发送下一条消息。
尝试增加侦听器容器上的 prefetch
,以便在消费者线程准备就绪时容器始终有可用的消息。
查看网络跟踪(Wireshark 或类似工具)。
编辑:
如果您的网络状况不佳,并且可以忍受重复投递的可能性增加,您也可以考虑增加 txSize
这样就不会为每条消息都发送 ack。不过,请务必将其设置为小于 prefetch
的值。