python celery - 如何在运行时向 worker 添加 CELERYBEAT_SCHEDULE 任务?
python celery - how to add CELERYBEAT_SCHEDULE task at runtime to a worker?
我创建了一个 celery worker,它有一个 celerybeat 计划任务,运行s 以 5 秒的时间间隔。如何在不停止 celery worker 的情况下动态添加另一个节拍任务?
示例
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_TIMEZONE = 'UTC',
CELERYBEAT_SCHEDULE = {
'long-run-5-secs': {
'task': 'test_proj.tasks.test',
'schedule': timedelta(seconds=5),
'args': (16, )
}
}
)
通过以上配置,我可以运行 成功地使用beat 模式的celery worker。
现在我需要动态添加以下节拍时间表:
'long-run-2-secs': {
'task': 'test_proj.tasks.test',
'schedule': timedelta(seconds=2),
'args': (14, ) },
谢谢
我一直在为同样的问题寻找解决方案。恐怕您将不得不等待 Celery ver.4.0。目前仅development版本支持动态任务调度:http://docs.celeryproject.org/en/master/userguide/periodic-tasks.html#beat-entries
一种可能的方法是将任务存储在数据库中并动态添加删除任务。您可以使用数据库支持的芹菜节拍调度程序。参考 https://django-celery-beat.readthedocs.io/en/latest/。 PeriodicTask 数据库存储周期性任务。您可以使用数据库命令(Django ORM)来操作周期性任务。
这就是我处理动态任务的方式(动态创建和停止任务)。
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
chon_schedule = CrontabSchedule.objects.create(minute='40', hour='08', day_of_week='*', day_of_month='*', month_of_year='*') # To create a cron schedule.
schedule = IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS) # To create a schedule to run everu 10 min.
PeriodicTask.objects.create(crontab=chon_schedule, name='name_to_identify_task',task='name_of_task') # It creates a entry in the database describing that periodic task (With cron schedule).
task = PeriodicTask.objects.create(interval=schedule, name='run for every 10 min', task='for_each_ten_min', ) # It creates a periodic task with interval schedule
Whenever you update a PeriodicTask a counter in this table is also
incremented, which tells the celery beat service to reload the
schedule from the database.
因此您无需重新启动或终止节拍。
如果您想在满足特定条件时停止任务,那么
periodic_task = PeriodicTask.objects.get(name='run for every 10 min')
periodic_task.enabled = False
periodic_task.save()
当 enabled 为 False 时,周期性任务变为空闲。您可以通过 enable = True
.
再次激活它
如果您不再需要该任务,则只需删除该条目即可。
我创建了一个 celery worker,它有一个 celerybeat 计划任务,运行s 以 5 秒的时间间隔。如何在不停止 celery worker 的情况下动态添加另一个节拍任务?
示例
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_TIMEZONE = 'UTC',
CELERYBEAT_SCHEDULE = {
'long-run-5-secs': {
'task': 'test_proj.tasks.test',
'schedule': timedelta(seconds=5),
'args': (16, )
}
}
)
通过以上配置,我可以运行 成功地使用beat 模式的celery worker。
现在我需要动态添加以下节拍时间表:
'long-run-2-secs': {
'task': 'test_proj.tasks.test',
'schedule': timedelta(seconds=2),
'args': (14, ) },
谢谢
我一直在为同样的问题寻找解决方案。恐怕您将不得不等待 Celery ver.4.0。目前仅development版本支持动态任务调度:http://docs.celeryproject.org/en/master/userguide/periodic-tasks.html#beat-entries
一种可能的方法是将任务存储在数据库中并动态添加删除任务。您可以使用数据库支持的芹菜节拍调度程序。参考 https://django-celery-beat.readthedocs.io/en/latest/。 PeriodicTask 数据库存储周期性任务。您可以使用数据库命令(Django ORM)来操作周期性任务。
这就是我处理动态任务的方式(动态创建和停止任务)。
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
chon_schedule = CrontabSchedule.objects.create(minute='40', hour='08', day_of_week='*', day_of_month='*', month_of_year='*') # To create a cron schedule.
schedule = IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS) # To create a schedule to run everu 10 min.
PeriodicTask.objects.create(crontab=chon_schedule, name='name_to_identify_task',task='name_of_task') # It creates a entry in the database describing that periodic task (With cron schedule).
task = PeriodicTask.objects.create(interval=schedule, name='run for every 10 min', task='for_each_ten_min', ) # It creates a periodic task with interval schedule
Whenever you update a PeriodicTask a counter in this table is also incremented, which tells the celery beat service to reload the schedule from the database.
因此您无需重新启动或终止节拍。 如果您想在满足特定条件时停止任务,那么
periodic_task = PeriodicTask.objects.get(name='run for every 10 min')
periodic_task.enabled = False
periodic_task.save()
当 enabled 为 False 时,周期性任务变为空闲。您可以通过 enable = True
.
如果您不再需要该任务,则只需删除该条目即可。