如何在计划任务的情况下从 celery 获取任务 ID (beat)
How to get task id from celery in case of scheduled tasks (beat)
要访问 celery 任务的信息,我需要 task_id。当手动启动 celery 任务时,我可以使用 task.id 轻松获取此任务的 ID(并将其写入数据库或执行其他操作)。
如果我使用定期向工作人员发送任务的 celery-beat,那似乎是不可能的。
所以我的问题是,如何在beat将任务发送给celery的worker的那一刻从任务中获取id?
在工作人员收到任务的那一刻,控制台显示任务ID。所以我担心的是,在任务被 beat 发送给 worker 的那一刻,它没有任务 ID。
手动案例获取task_id:
task = tasks.LongRunningTask.delay(username_from_formTargetsLaden, password_from_formTargetsLaden, url_from_formTargetsLaden)
task_id = task.id
也许你们中的一些人有想法?
我找到了那个小问题的答案:
如果您需要最初由 beat 发送的任务的任务 ID,您可以简单地向您的(计划的)worker 任务添加一个检查功能。
配置Periodic-Task
这是 "reminds" celery 每天在 11:08 am (UTC) 开始任务的时间表。
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
test = sender.add_periodic_task(crontab(minute=8, hour=11), CheckLists.s(app.config['USR'], app.config['PWD']))
要定期执行的任务
这是celery在收到来自beat的"reminder"后由celery执行的计划任务。
@celery.task(bind=True)
def CheckLists(self, arg1, arg2):
#get task_id von scheduled Task 'Check-List'
i = inspect()
activetasks = i.active()
list_of_tasks = {'activetasks': activetasks}
task_id = list_of_tasks['activetasks']['celery@DESKTOP-XXXXX'][0]['id'] #adapt this section depending on environment (local, webserver, etc...)
task_type = "CHECK_LISTS"
task_id_to_db = Tasks(task_id, task_type)
db.session.add(task_id_to_db)
db.session.commit()
long_runnning_task
[...more task relevant code here...]
所以我正在利用 app.control.inspect
来检查 运行 工人。它在后台使用远程控制命令。
使用 i.active()
你会得到一个字典,你可以很容易地解析它。
只要我没有找到任何文档如何更轻松地从周期性任务中获取 task_id,我就会坚持使用该解决方案。
保存任务 ID 后,您可以通过 AJAX 轻松轮询任务状态等。
希望对大家有所帮助:)
要访问 celery 任务的信息,我需要 task_id。当手动启动 celery 任务时,我可以使用 task.id 轻松获取此任务的 ID(并将其写入数据库或执行其他操作)。 如果我使用定期向工作人员发送任务的 celery-beat,那似乎是不可能的。
所以我的问题是,如何在beat将任务发送给celery的worker的那一刻从任务中获取id?
在工作人员收到任务的那一刻,控制台显示任务ID。所以我担心的是,在任务被 beat 发送给 worker 的那一刻,它没有任务 ID。
手动案例获取task_id:
task = tasks.LongRunningTask.delay(username_from_formTargetsLaden, password_from_formTargetsLaden, url_from_formTargetsLaden)
task_id = task.id
也许你们中的一些人有想法?
我找到了那个小问题的答案:
如果您需要最初由 beat 发送的任务的任务 ID,您可以简单地向您的(计划的)worker 任务添加一个检查功能。
配置Periodic-Task
这是 "reminds" celery 每天在 11:08 am (UTC) 开始任务的时间表。
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
test = sender.add_periodic_task(crontab(minute=8, hour=11), CheckLists.s(app.config['USR'], app.config['PWD']))
要定期执行的任务
这是celery在收到来自beat的"reminder"后由celery执行的计划任务。
@celery.task(bind=True)
def CheckLists(self, arg1, arg2):
#get task_id von scheduled Task 'Check-List'
i = inspect()
activetasks = i.active()
list_of_tasks = {'activetasks': activetasks}
task_id = list_of_tasks['activetasks']['celery@DESKTOP-XXXXX'][0]['id'] #adapt this section depending on environment (local, webserver, etc...)
task_type = "CHECK_LISTS"
task_id_to_db = Tasks(task_id, task_type)
db.session.add(task_id_to_db)
db.session.commit()
long_runnning_task
[...more task relevant code here...]
所以我正在利用 app.control.inspect
来检查 运行 工人。它在后台使用远程控制命令。
使用 i.active()
你会得到一个字典,你可以很容易地解析它。
只要我没有找到任何文档如何更轻松地从周期性任务中获取 task_id,我就会坚持使用该解决方案。
保存任务 ID 后,您可以通过 AJAX 轻松轮询任务状态等。
希望对大家有所帮助:)