Python: Kombu+RabbitMQ 死锁 - 队列要么阻塞要么阻塞
Python: Kombu+RabbitMQ Deadlock - queues are either blocked or blocking
问题
我有一个 RabbitMQ Server 作为我的一个系统的队列中心。在过去一周左右的时间里,它的生产商每隔几个小时就会完全停止生产。
我尝试了什么
蛮力
- 停止消费者释放锁几分钟,但随后阻塞returns。
- 重新启动 RabbitMQ 解决了几个小时的问题。
- 我有一些自动脚本可以执行丑陋的重启,但显然它远不是一个合适的解决方案。
分配更多内存
关注 , I have increased the memory allocated to RabbitMQ to 90%。服务器有高达 16GB 的内存,消息计数不是很高(每天数百万),所以这似乎不是问题。
从命令行:
sudo rabbitmqctl set_vm_memory_high_watermark 0.9
与 /etc/rabbitmq/rabbitmq.config
:
[
{rabbit,
[
{loopback_users, []},
{vm_memory_high_watermark, 0.9}
]
}
].
代码与设计
我对所有消费者和生产者使用Python。
制作人
生产者是 API 服务调用的服务器。每当有呼叫到达时,都会打开连接,发送消息并关闭连接。
from kombu import Connection
def send_message_to_queue(host, port, queue_name, message):
"""Sends a single message to the queue."""
with Connection('amqp://guest:guest@%s:%s//' % (host, port)) as conn:
simple_queue = conn.SimpleQueue(name=queue_name, no_ack=True)
simple_queue.put(message)
simple_queue.close()
消费者
消费者之间略有不同,但通常使用以下模式 - 打开一个连接,并等待它直到消息到达。连接可以长时间保持打开状态(比如几天)。
with Connection('amqp://whatever:whatever@whatever:whatever//') as conn:
while True:
queue = conn.SimpleQueue(queue_name)
message = queue.get(block=True)
message.ack()
设计推理
- 消费者始终需要与队列服务器保持打开的连接
- Producer 会话应该只在 API 调用的生命周期内存在
这一设计直到大约一周前才出现问题。
Web 视图仪表板
Web 控制台显示 127.0.0.1
和 172.31.38.50
中的消费者阻止了 172.31.38.50
、172.31.39.120
、172.31.41.38
和 [=20= 中的消费者].
系统指标
为了安全起见,我检查了服务器负载。正如预期的那样,平均负载和 CPU 利用率指标很低。
为什么兔子MQ每次都这样死锁?
我写这个作为答案,部分是因为它可能有帮助,部分是因为它太大而不能作为评论。
首先,很抱歉错过了这个 message = queue.get(block=True)
。还有一个免责声明——我不熟悉 python 也不熟悉 PIKA API.
AMQP's basic.get
is actually synchronous 而您正在设置 block=true
。正如我所说,不知道这在 PIKA 中意味着什么,但结合不断合并队列,听起来效率不高。因此,无论出于何种原因,发布者都可能因消费者阻止队列访问而拒绝连接。它实际上完全符合您通过 Stopping the consumers releases the lock for a few minutes, but then blocking returns.
临时解决问题的方式
我建议尝试使用 AMQP 的 basic.consume
而不是 basic.get
。我不知道获取的动机是什么,但在大多数情况下(无论如何我的经验)你应该使用消费。仅引用上述 link
This method provides a direct access to the messages in a queue using
a synchronous dialogue that is designed for specific types of
application where synchronous functionality is more important than
performance.
在 RabbitMQ docs 中,它表示当代理资源不足时连接会被阻塞,但正如您所写的那样,负载非常低。为了安全起见,您可以检查内存消耗和可用磁盘space。
这很可能是由 RabbitMQ 3.6.2 的管理模块中的内存泄漏引起的。这现已在 RabbitMQ 3.6.3 中修复,并且可用 here.
问题本身已描述 here, but is also discussed extensively on the RabbitMQ messages boards; for example here and here. This has also been known to cause a lot of weird issues, a good example is the issue reported here。
作为新版本发布前的临时修复,您可以升级到新的 est 版本、降级到 3.6.1 或完全禁用管理模块。
问题
我有一个 RabbitMQ Server 作为我的一个系统的队列中心。在过去一周左右的时间里,它的生产商每隔几个小时就会完全停止生产。
我尝试了什么
蛮力
- 停止消费者释放锁几分钟,但随后阻塞returns。
- 重新启动 RabbitMQ 解决了几个小时的问题。
- 我有一些自动脚本可以执行丑陋的重启,但显然它远不是一个合适的解决方案。
分配更多内存
关注
从命令行:
sudo rabbitmqctl set_vm_memory_high_watermark 0.9
与 /etc/rabbitmq/rabbitmq.config
:
[
{rabbit,
[
{loopback_users, []},
{vm_memory_high_watermark, 0.9}
]
}
].
代码与设计
我对所有消费者和生产者使用Python。
制作人
生产者是 API 服务调用的服务器。每当有呼叫到达时,都会打开连接,发送消息并关闭连接。
from kombu import Connection
def send_message_to_queue(host, port, queue_name, message):
"""Sends a single message to the queue."""
with Connection('amqp://guest:guest@%s:%s//' % (host, port)) as conn:
simple_queue = conn.SimpleQueue(name=queue_name, no_ack=True)
simple_queue.put(message)
simple_queue.close()
消费者
消费者之间略有不同,但通常使用以下模式 - 打开一个连接,并等待它直到消息到达。连接可以长时间保持打开状态(比如几天)。
with Connection('amqp://whatever:whatever@whatever:whatever//') as conn:
while True:
queue = conn.SimpleQueue(queue_name)
message = queue.get(block=True)
message.ack()
设计推理
- 消费者始终需要与队列服务器保持打开的连接
- Producer 会话应该只在 API 调用的生命周期内存在
这一设计直到大约一周前才出现问题。
Web 视图仪表板
Web 控制台显示 127.0.0.1
和 172.31.38.50
中的消费者阻止了 172.31.38.50
、172.31.39.120
、172.31.41.38
和 [=20= 中的消费者].
系统指标
为了安全起见,我检查了服务器负载。正如预期的那样,平均负载和 CPU 利用率指标很低。
为什么兔子MQ每次都这样死锁?
我写这个作为答案,部分是因为它可能有帮助,部分是因为它太大而不能作为评论。
首先,很抱歉错过了这个 message = queue.get(block=True)
。还有一个免责声明——我不熟悉 python 也不熟悉 PIKA API.
AMQP's basic.get
is actually synchronous 而您正在设置 block=true
。正如我所说,不知道这在 PIKA 中意味着什么,但结合不断合并队列,听起来效率不高。因此,无论出于何种原因,发布者都可能因消费者阻止队列访问而拒绝连接。它实际上完全符合您通过 Stopping the consumers releases the lock for a few minutes, but then blocking returns.
我建议尝试使用 AMQP 的 basic.consume
而不是 basic.get
。我不知道获取的动机是什么,但在大多数情况下(无论如何我的经验)你应该使用消费。仅引用上述 link
This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.
在 RabbitMQ docs 中,它表示当代理资源不足时连接会被阻塞,但正如您所写的那样,负载非常低。为了安全起见,您可以检查内存消耗和可用磁盘space。
这很可能是由 RabbitMQ 3.6.2 的管理模块中的内存泄漏引起的。这现已在 RabbitMQ 3.6.3 中修复,并且可用 here.
问题本身已描述 here, but is also discussed extensively on the RabbitMQ messages boards; for example here and here. This has also been known to cause a lot of weird issues, a good example is the issue reported here。
作为新版本发布前的临时修复,您可以升级到新的 est 版本、降级到 3.6.1 或完全禁用管理模块。