Celery Task 自定义跟踪方式
Celery Task Custom tracking method
我的主要问题在于我需要知道任务是否仍在排队、开始或撤销。
我不能用 celery 和 redis 执行此操作,因为在结果进入 redis 24 小时后,它们将被删除。
我有一些想法,但我认为最可靠的想法是拥有一个数据库跟踪并手动添加我需要的用户正在执行的任务的关键信息 运行ning。
有一些方法可以 运行 在任务开始之前,我也可以在创建任务或撤销任务时手动使用数据库,对吗?我不会为每个任务创建一个新行,而是为每个用户更新一行,因为我只对每个用户的最后一个任务感兴趣。
您可能必须结合多种方法。如果你的结果在后端过期(这是合理的),你必须使用不同的存储,比如数据库来长期归档任务的状态。首先,您可以启用 task_track_started
以便任务在 worker 开始执行时报告 STARTED
状态)。然后定期检查结果后端以获取未处于就绪状态(SUCCESS
、FAILURE
和 REVOKED
)的任务的状态更新。如果它们处于最终状态,则使用 forget()
方法从后端删除结果。
唯一的问题是已撤销的任务。如果没有可用的工作人员,则撤销任务无效(这就是为什么在调用撤销时应始终等待回复)。如果工作人员很忙,因此任务仍保留在消息队列中,工作人员只是注意到当他们从队列中取出该任务时应该丢弃该任务,但它仅存储在工作人员的状态中。一旦他们接受了它,他们就会放弃任务,结果最终包含 REVOKED
状态。关键是要注意,被撤销的任务只保留在工人的状态,所以你应该使用 --statedb
参数来保持状态,以防工人崩溃。否则,已经撤销的任务将愉快地由同一个或另一个工作人员处理。
我猜你最好的选择是调用 revoke 命令,如果你得到工作人员的回复,请将数据库中任务的内部状态设置为 FLAGGED_REVOKED
之类的东西。在状态更新循环中,只有当它不是 PENDING
.
时才更新被撤销任务的状态
我有一个简单的作业调度应用程序,它使用 APScheduler 作为调度程序,使用 Celery 作为执行层。有关作业、作业 运行 和日程安排的信息保存在 MongoDB 中。这是我用来取消作业的代码:
database = scheduler._jobstores['default'].collection.database
collection = database['runs']
run = collection.find_one({'job_id': job_id, '_id': run_id})
if run.get('task_state') in ('PENDING', 'RECEIVED', 'STARTED', 'RETRY'):
reply = celery.control.revoke(run['task_id'], terminate=terminate, reply=True)
if reply:
collection.update_one({'_id': run['_id']},
{'$set': {'task_state': 'FLAGGED_REVOKED'}})
else:
raise Exception('Failed to revoke the task (no reply received)')
else:
raise Exception('Job execution cannot be canceled')
这是我的状态更新代码(作为内部 APScheduler 作业保存到 运行 每隔几秒):
database = scheduler._jobstores['default'].collection.database
collection = database['runs']
runs = collection.find({
'task_id': {'$exists': True},
'task_state': {'$nin': ['SUCCESS', 'FAILURE', 'REVOKED']}
})
for run in runs:
result = AsyncResult(run['task_id'],
backend=celery.backend, app=celery)
if run['task_state'] == 'FLAGGED_REVOKED' and result.state == 'PENDING':
update = {'task_state': 'FLAGGED_REVOKED'}
else:
update = {'task_state': result.state}
if result.state == 'FAILURE':
update['exception'] = str(result.result)
update['traceback'] = result.traceback
elif result.state == 'SUCCESS':
update['result'] = result.result
if result.date_done:
date_done = dateparser.parse(result.date_done) \
if isinstance(result.date_done, str) else result.date_done
update['finish_time'] = date_done
try:
collection.update_one({'_id': run['_id']}, {'$set': update})
except Exception as e:
print('Failed to update task status: %s', str(e))
else:
if result.state in ['SUCCESS', 'FAILURE', 'REVOKED']:
result.forget()
我的主要问题在于我需要知道任务是否仍在排队、开始或撤销。
我不能用 celery 和 redis 执行此操作,因为在结果进入 redis 24 小时后,它们将被删除。
我有一些想法,但我认为最可靠的想法是拥有一个数据库跟踪并手动添加我需要的用户正在执行的任务的关键信息 运行ning。
有一些方法可以 运行 在任务开始之前,我也可以在创建任务或撤销任务时手动使用数据库,对吗?我不会为每个任务创建一个新行,而是为每个用户更新一行,因为我只对每个用户的最后一个任务感兴趣。
您可能必须结合多种方法。如果你的结果在后端过期(这是合理的),你必须使用不同的存储,比如数据库来长期归档任务的状态。首先,您可以启用 task_track_started
以便任务在 worker 开始执行时报告 STARTED
状态)。然后定期检查结果后端以获取未处于就绪状态(SUCCESS
、FAILURE
和 REVOKED
)的任务的状态更新。如果它们处于最终状态,则使用 forget()
方法从后端删除结果。
唯一的问题是已撤销的任务。如果没有可用的工作人员,则撤销任务无效(这就是为什么在调用撤销时应始终等待回复)。如果工作人员很忙,因此任务仍保留在消息队列中,工作人员只是注意到当他们从队列中取出该任务时应该丢弃该任务,但它仅存储在工作人员的状态中。一旦他们接受了它,他们就会放弃任务,结果最终包含 REVOKED
状态。关键是要注意,被撤销的任务只保留在工人的状态,所以你应该使用 --statedb
参数来保持状态,以防工人崩溃。否则,已经撤销的任务将愉快地由同一个或另一个工作人员处理。
我猜你最好的选择是调用 revoke 命令,如果你得到工作人员的回复,请将数据库中任务的内部状态设置为 FLAGGED_REVOKED
之类的东西。在状态更新循环中,只有当它不是 PENDING
.
我有一个简单的作业调度应用程序,它使用 APScheduler 作为调度程序,使用 Celery 作为执行层。有关作业、作业 运行 和日程安排的信息保存在 MongoDB 中。这是我用来取消作业的代码:
database = scheduler._jobstores['default'].collection.database
collection = database['runs']
run = collection.find_one({'job_id': job_id, '_id': run_id})
if run.get('task_state') in ('PENDING', 'RECEIVED', 'STARTED', 'RETRY'):
reply = celery.control.revoke(run['task_id'], terminate=terminate, reply=True)
if reply:
collection.update_one({'_id': run['_id']},
{'$set': {'task_state': 'FLAGGED_REVOKED'}})
else:
raise Exception('Failed to revoke the task (no reply received)')
else:
raise Exception('Job execution cannot be canceled')
这是我的状态更新代码(作为内部 APScheduler 作业保存到 运行 每隔几秒):
database = scheduler._jobstores['default'].collection.database
collection = database['runs']
runs = collection.find({
'task_id': {'$exists': True},
'task_state': {'$nin': ['SUCCESS', 'FAILURE', 'REVOKED']}
})
for run in runs:
result = AsyncResult(run['task_id'],
backend=celery.backend, app=celery)
if run['task_state'] == 'FLAGGED_REVOKED' and result.state == 'PENDING':
update = {'task_state': 'FLAGGED_REVOKED'}
else:
update = {'task_state': result.state}
if result.state == 'FAILURE':
update['exception'] = str(result.result)
update['traceback'] = result.traceback
elif result.state == 'SUCCESS':
update['result'] = result.result
if result.date_done:
date_done = dateparser.parse(result.date_done) \
if isinstance(result.date_done, str) else result.date_done
update['finish_time'] = date_done
try:
collection.update_one({'_id': run['_id']}, {'$set': update})
except Exception as e:
print('Failed to update task status: %s', str(e))
else:
if result.state in ['SUCCESS', 'FAILURE', 'REVOKED']:
result.forget()