Rabbitmq 不刷新 acks?
Rabbit MQ doesn't flush acks?
问题出现在日志中:Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?
我们尝试通过 SimpleMessageListenerContainer.addQueueNames()
动态打开大约 50 个队列的处理程序,然后启动应用程序。它消耗了一些消息,但 RabbitMQ 管理面板显示它们未被处理。经过很长一段时间后,每个队列的消息最多堆叠 6 条未确认的消息(队列每分钟的消息量相当低),总共有 300 条消息,发生了一些事情,它们都被消耗和确认了。当消息未被确认时,容器似乎正在尝试加载另一个消费者,直到它达到限制。
我们现在依赖AUTO
确认模式,当时是MANUAL
,没问题。
有两个问题:
未确认消息的原因可能是什么?有没有不经常触发的刷新机制?
如何处理“线程不足”消息?
这两个似乎真的是相互关联的。
设置如下:
@Bean
fun queueMessageListenerContainer(
connectionFactory: ConnectionFactory,
retryOperationsInterceptor: RetryOperationsInterceptor,
vehicleQueueListenerFactory: QueueListenerFactory,
): SimpleMessageListenerContainer {
return SimpleMessageListenerContainer().also {
it.connectionFactory = connectionFactory
it.setConsumerTagStrategy { queueName -> consumerTag(queueName) }
it.setMessageListener(vehicleQueueListenerFactory.create())
it.setConcurrentConsumers(2)
it.setMaxConcurrentConsumers(5)
it.setListenerId("queue-consumer")
it.setAdviceChain(retryOperationsInterceptor)
it.setRecoveryInterval(RABBIT_HEARTH_BEAT.toMillis())
//had 10-100 threads, didn't help
it.setTaskExecutor(rabbitConsumersExecutorService)
// AUTO suppose to set ack for the messages, right?
it.acknowledgeMode = AcknowledgeMode.AUTO
}
}
@Bean
fun connectionFactory(rabbitProperties: RabbitProperties): AbstractConnectionFactory {
val rabbitConnectionFactory = com.rabbitmq.client.ConnectionFactory().also { connectionFactory ->
connectionFactory.isAutomaticRecoveryEnabled = true
connectionFactory.isTopologyRecoveryEnabled = true
connectionFactory.networkRecoveryInterval = RABBIT_HEARTH_BEAT.toMillis()
connectionFactory.requestedHeartbeat = RABBIT_HEARTH_BEAT.toSeconds().toInt()
// was up to 100 connections, didn't help
connectionFactory.setSharedExecutor(rabbitConnectionExecutorService)
connectionFactory.host = rabbitProperties.host
connectionFactory.port = rabbitProperties.port ?: connectionFactory.port
}
return CachingConnectionFactory(rabbitConnectionFactory)
.also {
it.cacheMode = rabbitProperties.cache.connection.mode
it.connectionCacheSize = rabbitProperties.cache.connection.size
it.setConnectionNameStrategy { "simulation-gateway:${springProfiles.firstOrNull()}:event-consumer" }
}
}
class QueueListenerFactory {
fun create(){
return MessageListener {
try {
// no ack, rely on AUTO acknowledgement mode
handle()
} catch (e: Throwable) {
...
}
}
}
}
好的,我知道问题出在哪里了。基本上,它无法及时启动所有队列消费者,因为它不仅是 SimpleMessageListenerContainer 太慢的过程,而且我们还尝试 addQueueNames
一个一个地
userRepository.findAll()
.map { user -> queueName(user) }
.onEach { queueName ->
simpleContainerListener.addQueueNames(queueName)
}
但是 SimpleMessageListenerContainer
的以下文档行仍未引起注意:
The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created
这意味着实际发生的是 (1, 2, ... N) 消费者的再创造。更糟糕的是,如果请求来自 API,我们在处理请求后做了完全相同的 simpleContainerListener.addQueueNames(queueName)
,之后重新创建了所有消费者。
此外,消费者的娱乐是 AUTO
确认无效的原因:线程挂起试图在超时前建立足够的消费者。
我通过添加 DirectMessageListenerContainer
来处理最近添加的队列来解决此问题,与仅添加一个新消费者的特定情况下的 SimpleMessageListenerContainer
相比,这是非常快的。
DirectMessageListenerContainer(connectionFactory).also {
it.setConsumerTagStrategy { queueName -> consumerTag(queueName, RECENT_CONSUMER_TAG) }
it.setMessageListener(ListenerFactory.create())
it.setListenerId("queue-consumer-recent")
it.setAdviceChain(retryOperationsInterceptor)
it.setTaskExecutor(recentQueuesTaskExecutor)
it.acknowledgeMode = AcknowledgeMode.AUTO
}
缺点是 DirectMessageListenerContainer
在每个实例上每个队列使用 1 个线程。这正是我一开始不想使用它的原因,但是对最近的队列使用 DirectMessageListenerContainer
对已经存在的队列使用 SimpleContainerListener
可以显着减少处理这些队列所需的线程量。据我了解,过度使用DirectMessageListenerContainer
最终会导致OOM,因此下一步可以将队列从直接的容器监听器批量转移到简单的容器监听器。
问题出现在日志中:Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?
我们尝试通过 SimpleMessageListenerContainer.addQueueNames()
动态打开大约 50 个队列的处理程序,然后启动应用程序。它消耗了一些消息,但 RabbitMQ 管理面板显示它们未被处理。经过很长一段时间后,每个队列的消息最多堆叠 6 条未确认的消息(队列每分钟的消息量相当低),总共有 300 条消息,发生了一些事情,它们都被消耗和确认了。当消息未被确认时,容器似乎正在尝试加载另一个消费者,直到它达到限制。
我们现在依赖AUTO
确认模式,当时是MANUAL
,没问题。
有两个问题:
未确认消息的原因可能是什么?有没有不经常触发的刷新机制?
如何处理“线程不足”消息?
这两个似乎真的是相互关联的。
设置如下:
@Bean
fun queueMessageListenerContainer(
connectionFactory: ConnectionFactory,
retryOperationsInterceptor: RetryOperationsInterceptor,
vehicleQueueListenerFactory: QueueListenerFactory,
): SimpleMessageListenerContainer {
return SimpleMessageListenerContainer().also {
it.connectionFactory = connectionFactory
it.setConsumerTagStrategy { queueName -> consumerTag(queueName) }
it.setMessageListener(vehicleQueueListenerFactory.create())
it.setConcurrentConsumers(2)
it.setMaxConcurrentConsumers(5)
it.setListenerId("queue-consumer")
it.setAdviceChain(retryOperationsInterceptor)
it.setRecoveryInterval(RABBIT_HEARTH_BEAT.toMillis())
//had 10-100 threads, didn't help
it.setTaskExecutor(rabbitConsumersExecutorService)
// AUTO suppose to set ack for the messages, right?
it.acknowledgeMode = AcknowledgeMode.AUTO
}
}
@Bean
fun connectionFactory(rabbitProperties: RabbitProperties): AbstractConnectionFactory {
val rabbitConnectionFactory = com.rabbitmq.client.ConnectionFactory().also { connectionFactory ->
connectionFactory.isAutomaticRecoveryEnabled = true
connectionFactory.isTopologyRecoveryEnabled = true
connectionFactory.networkRecoveryInterval = RABBIT_HEARTH_BEAT.toMillis()
connectionFactory.requestedHeartbeat = RABBIT_HEARTH_BEAT.toSeconds().toInt()
// was up to 100 connections, didn't help
connectionFactory.setSharedExecutor(rabbitConnectionExecutorService)
connectionFactory.host = rabbitProperties.host
connectionFactory.port = rabbitProperties.port ?: connectionFactory.port
}
return CachingConnectionFactory(rabbitConnectionFactory)
.also {
it.cacheMode = rabbitProperties.cache.connection.mode
it.connectionCacheSize = rabbitProperties.cache.connection.size
it.setConnectionNameStrategy { "simulation-gateway:${springProfiles.firstOrNull()}:event-consumer" }
}
}
class QueueListenerFactory {
fun create(){
return MessageListener {
try {
// no ack, rely on AUTO acknowledgement mode
handle()
} catch (e: Throwable) {
...
}
}
}
}
好的,我知道问题出在哪里了。基本上,它无法及时启动所有队列消费者,因为它不仅是 SimpleMessageListenerContainer 太慢的过程,而且我们还尝试 addQueueNames
一个一个地
userRepository.findAll()
.map { user -> queueName(user) }
.onEach { queueName ->
simpleContainerListener.addQueueNames(queueName)
}
但是 SimpleMessageListenerContainer
的以下文档行仍未引起注意:
The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created
这意味着实际发生的是 (1, 2, ... N) 消费者的再创造。更糟糕的是,如果请求来自 API,我们在处理请求后做了完全相同的 simpleContainerListener.addQueueNames(queueName)
,之后重新创建了所有消费者。
此外,消费者的娱乐是 AUTO
确认无效的原因:线程挂起试图在超时前建立足够的消费者。
我通过添加 DirectMessageListenerContainer
来处理最近添加的队列来解决此问题,与仅添加一个新消费者的特定情况下的 SimpleMessageListenerContainer
相比,这是非常快的。
DirectMessageListenerContainer(connectionFactory).also {
it.setConsumerTagStrategy { queueName -> consumerTag(queueName, RECENT_CONSUMER_TAG) }
it.setMessageListener(ListenerFactory.create())
it.setListenerId("queue-consumer-recent")
it.setAdviceChain(retryOperationsInterceptor)
it.setTaskExecutor(recentQueuesTaskExecutor)
it.acknowledgeMode = AcknowledgeMode.AUTO
}
缺点是 DirectMessageListenerContainer
在每个实例上每个队列使用 1 个线程。这正是我一开始不想使用它的原因,但是对最近的队列使用 DirectMessageListenerContainer
对已经存在的队列使用 SimpleContainerListener
可以显着减少处理这些队列所需的线程量。据我了解,过度使用DirectMessageListenerContainer
最终会导致OOM,因此下一步可以将队列从直接的容器监听器批量转移到简单的容器监听器。