如何在芹菜中安排链式任务
How to schedule a chain task in celery
我想运行一个由节拍安排的复杂任务。让我们假设默认的 add/mul 任务已定义。
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
crontab(),
add.s(2,3) | mul.s(2)
)
但这将 return worker 中的一个错误:
NotImplementedError: chain is not a real task
如何用 celery beat 安排一个重要的任务?
一种方法是在 celeryconfig
中安排 beat_schedule
中的任务链,使用 link
选项,celery_tasks
这是一个模块名称,您的任务已定义
from celery.schedules import crontab
from celery import signature
beat_schedule = {
'chained': {
'task': 'celery_tasks.add',
'schedule': crontab(),
'options': {
'queue': 'default',
'link': signature('celery_tasks.mul',
args=(),
kwargs={},
options={
'link': signature('celery_tasks.another_task',
args=(),
kwargs={},
queue='default')
},
queue='default')
},
'args': ()
}
}
为了添加链式周期性任务,您可以在声明链时使用@app.task,然后在 add_periodic_task() 方法上添加此新任务。示例:
@app.on_after_finalize.connect ->i use this because it`s declared on task.py
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(timedelta(minutes=10), chian_st22.s(),name='test')
@app.task
def chian_st22(): -> i create the task with chain
cadena = chain(st22.s(), mailer.s()).apply_async()
@app.task
def mailer(data):
clase = CheckAlert()
mail = clase.envio_mail(data)
return mail
@app.task
def st22():
clase = CheckAlert()
st = clase.check_st22_dumps()
return st
我想运行一个由节拍安排的复杂任务。让我们假设默认的 add/mul 任务已定义。
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
crontab(),
add.s(2,3) | mul.s(2)
)
但这将 return worker 中的一个错误:
NotImplementedError: chain is not a real task
如何用 celery beat 安排一个重要的任务?
一种方法是在 celeryconfig
中安排 beat_schedule
中的任务链,使用 link
选项,celery_tasks
这是一个模块名称,您的任务已定义
from celery.schedules import crontab
from celery import signature
beat_schedule = {
'chained': {
'task': 'celery_tasks.add',
'schedule': crontab(),
'options': {
'queue': 'default',
'link': signature('celery_tasks.mul',
args=(),
kwargs={},
options={
'link': signature('celery_tasks.another_task',
args=(),
kwargs={},
queue='default')
},
queue='default')
},
'args': ()
}
}
为了添加链式周期性任务,您可以在声明链时使用@app.task,然后在 add_periodic_task() 方法上添加此新任务。示例:
@app.on_after_finalize.connect ->i use this because it`s declared on task.py
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(timedelta(minutes=10), chian_st22.s(),name='test')
@app.task
def chian_st22(): -> i create the task with chain
cadena = chain(st22.s(), mailer.s()).apply_async()
@app.task
def mailer(data):
clase = CheckAlert()
mail = clase.envio_mail(data)
return mail
@app.task
def st22():
clase = CheckAlert()
st = clase.check_st22_dumps()
return st