芹菜任务 "Received" 是什么意思?当所有 celery worker 都被阻塞时,不是 "Received" 的新任务发生了什么?
What does it mean for a celery task to be "Received"? When all celery workers are blocked, what is happening with new tasks that are not "Received"?
我正在开发一个新的监控系统,该系统可以测量 Celery 队列吞吐量并在队列备份时帮助提醒团队。在我的工作过程中,我遇到了一些我不理解的特殊行为(并且在 Celery 规范中没有详细记录)。
出于测试目的,我设置了一个端点,它将使用 16 个 运行 长任务填充队列,这些任务可用于模拟备份队列。框架是 Flask,Queue broker 是 Redis。 Celery 被配置为每个 worker 最多并行处理 4 个任务,我有 2 个 worker 运行.
api/health.py
def health():
health = Blueprint("health", __name__)
@health.route("/api/debug/create-long-queue", methods=["GET"])
def long_queue():
for i in range(16):
sleepy_job.delay()
return make_response({}, 200)
return health
jobs.py
@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
time.sleep(30)
这是我模拟备份生产队列的方法:
- 我调用
/api/debug/create-long-queue
来模拟队列中的备份。根据上面的计算,工人应该忙着睡1分钟(加在一起,他们一次可以同时处理8个任务。每个任务只睡30秒,总共有16个任务。)
- 我在不久之后(< 5 秒)进行了另一个 API 调用,它启动了具有真实业务逻辑的不同工作(处理入站 webhook API 调用)。我们称此作业为
handle_incoming_message
.
这是我使用 flower 检查队列的结果:
- 虽然所有工作人员都被前 8 个
sleepy_job
任务阻塞,但我在队列中看不到新 handle_incoming_message
的迹象,即使我确定 handle_incoming_message.delay()
已被调用作为第二次 API 调用的结果。
- 前 8 个
sleepy_job
任务完成后(~30 秒),我在队列中看到状态为 RECIEVED
的新 handle_incoming_message
。
- 在第二个(也是最后一个)8
sleepy_job
任务完成后,我现在看到 handle_incoming_message
的状态为 STARTED
(我可以确认这是 UI 使用在该任务中接收和处理的新数据进行更新。)
问题
所以很明显,当工作人员在处理前 8 个 sleepy_job
任务后暂时解除阻塞时,他们正在做 某事 到 mark/acknowledge 新任务handle_incoming_message
以花可见的方式完成任务。 但这留下了几个悬而未决的问题:
- 当 worker 被阻塞时,新
handle_incoming_message
任务的状态是什么?
- worker 解除阻塞后有什么变化使 flower 现在可以看到新的
handle_incoming_message
任务?
- “已接收”状态的实际含义是什么?
- (奖励:我如何才能看到在工作人员被阻塞时排队的任务?)
当所有 worker 都被阻塞时,由于预取,某些任务可能处于已接收状态(请查看相关文档)。因此,您的任务很可能只是在队列中,等待 Celery 工作人员(协调进程 - 这些不是实际的工作进程)接收。
Flower 是一个简单的服务,它建立在称为“任务事件”的 Celery 功能之上。简单来说,它 (Flower) 将自己订阅为所有事件(接收、成功、开始、失败等)的接收者,然后将这些事件可视化地呈现给 Web 客户端。 More about it here。因此,当 Celery worker 收到任务时,将发送一个“task-received”事件。 Flower 获取此事件,并在仪表板中更改该任务的状态。
当一个任务被“接收”时,这意味着特定的 Celery worker 将该任务从队列中取出并且它可能会立即执行(如果有空闲 worker-process 来执行它) ,或者 Celery worker 将等待 worker 进程准备好 运行 任务。我已经提到过预取 - Celery 工作人员通常会承担比可用任务更多的任务 worker-processes.
Celery 没有为用户提供列出特定队列中的内容的方法。这就是为什么您会看到许多类似的问题 - 包括 this one which offers answers。你会在那里看到我的简短回答。简而言之,这取决于您选择的经纪人。如果是 Redis,那么您只需浏览对象列表即可。如果是 RabbitMQ,那么您可以使用他们的工具来检查队列。我认为不提供此信息的决定是好的,因为此信息永远不可靠。当您列出特定队列中的所有任务时,可能会有数千个新任务...
我正在开发一个新的监控系统,该系统可以测量 Celery 队列吞吐量并在队列备份时帮助提醒团队。在我的工作过程中,我遇到了一些我不理解的特殊行为(并且在 Celery 规范中没有详细记录)。
出于测试目的,我设置了一个端点,它将使用 16 个 运行 长任务填充队列,这些任务可用于模拟备份队列。框架是 Flask,Queue broker 是 Redis。 Celery 被配置为每个 worker 最多并行处理 4 个任务,我有 2 个 worker 运行.
api/health.py
def health():
health = Blueprint("health", __name__)
@health.route("/api/debug/create-long-queue", methods=["GET"])
def long_queue():
for i in range(16):
sleepy_job.delay()
return make_response({}, 200)
return health
jobs.py
@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
time.sleep(30)
这是我模拟备份生产队列的方法:
- 我调用
/api/debug/create-long-queue
来模拟队列中的备份。根据上面的计算,工人应该忙着睡1分钟(加在一起,他们一次可以同时处理8个任务。每个任务只睡30秒,总共有16个任务。) - 我在不久之后(< 5 秒)进行了另一个 API 调用,它启动了具有真实业务逻辑的不同工作(处理入站 webhook API 调用)。我们称此作业为
handle_incoming_message
.
这是我使用 flower 检查队列的结果:
- 虽然所有工作人员都被前 8 个
sleepy_job
任务阻塞,但我在队列中看不到新handle_incoming_message
的迹象,即使我确定handle_incoming_message.delay()
已被调用作为第二次 API 调用的结果。 - 前 8 个
sleepy_job
任务完成后(~30 秒),我在队列中看到状态为RECIEVED
的新handle_incoming_message
。 - 在第二个(也是最后一个)8
sleepy_job
任务完成后,我现在看到handle_incoming_message
的状态为STARTED
(我可以确认这是 UI 使用在该任务中接收和处理的新数据进行更新。)
问题
所以很明显,当工作人员在处理前 8 个 sleepy_job
任务后暂时解除阻塞时,他们正在做 某事 到 mark/acknowledge 新任务handle_incoming_message
以花可见的方式完成任务。 但这留下了几个悬而未决的问题:
- 当 worker 被阻塞时,新
handle_incoming_message
任务的状态是什么? - worker 解除阻塞后有什么变化使 flower 现在可以看到新的
handle_incoming_message
任务? - “已接收”状态的实际含义是什么?
- (奖励:我如何才能看到在工作人员被阻塞时排队的任务?)
当所有 worker 都被阻塞时,由于预取,某些任务可能处于已接收状态(请查看相关文档)。因此,您的任务很可能只是在队列中,等待 Celery 工作人员(协调进程 - 这些不是实际的工作进程)接收。
Flower 是一个简单的服务,它建立在称为“任务事件”的 Celery 功能之上。简单来说,它 (Flower) 将自己订阅为所有事件(接收、成功、开始、失败等)的接收者,然后将这些事件可视化地呈现给 Web 客户端。 More about it here。因此,当 Celery worker 收到任务时,将发送一个“task-received”事件。 Flower 获取此事件,并在仪表板中更改该任务的状态。
当一个任务被“接收”时,这意味着特定的 Celery worker 将该任务从队列中取出并且它可能会立即执行(如果有空闲 worker-process 来执行它) ,或者 Celery worker 将等待 worker 进程准备好 运行 任务。我已经提到过预取 - Celery 工作人员通常会承担比可用任务更多的任务 worker-processes.
Celery 没有为用户提供列出特定队列中的内容的方法。这就是为什么您会看到许多类似的问题 - 包括 this one which offers answers。你会在那里看到我的简短回答。简而言之,这取决于您选择的经纪人。如果是 Redis,那么您只需浏览对象列表即可。如果是 RabbitMQ,那么您可以使用他们的工具来检查队列。我认为不提供此信息的决定是好的,因为此信息永远不可靠。当您列出特定队列中的所有任务时,可能会有数千个新任务...