使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务
Setting up periodic tasks in Celery (celerybeat) dynamically using add_periodic_task
我正在使用 Celery 4.0.1
和 Django 1.10
,我在安排任务时遇到了问题(运行 一个任务工作正常)。这是芹菜配置:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2
app.conf.CELERY_QUEUES = (
Queue('myapp.celery_default'),
Queue('myapp.queue1'),
Queue('myapp.queue2'),
Queue('myapp.queue3'),
)
然后在 tasks.py 我有:
@app.task(queue='myapp.queue1')
def my_task(some_id):
print("Doing something with", some_id)
在views.py我想安排这个任务:
def my_view(request, id):
app.add_periodic_task(10, my_task.s(id))
然后我执行命令:
sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app
但任务从未安排。我在日志中没有看到任何内容。该任务正在运行,因为如果在我看来我这样做:
def my_view(request, id):
my_task.delay(id)
任务执行完毕
如果在我的配置文件中,如果我手动安排任务,就像这样它会起作用:
app.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.my_task',
'schedule': 10.0,
'args': (66,)
},
}
我无法动态安排任务。有什么想法吗?
编辑:(13/01/2018)
The latest release 4.1.0 have addressed the subject in this ticket #3958 and has been merged
实际上你不能在视图级别不定义周期性任务,因为节拍计划设置将首先加载并且无法在运行时重新计划:
The add_periodic_task()
function will add the entry to the beat_schedule setting behind the scenes, and the same setting can also can be used to set up periodic tasks manually:
app.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.my_task',
'schedule': 10.0,
'args': (66,)
},
}
这意味着如果你想使用 add_periodic_task()
它应该被包裹在 celery 应用程序级别的 on_after_configure
处理程序中并且运行时的任何修改都不会生效:
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10, my_task.s(66))
如 doc 中所述,常规的 celerybeat 只是跟踪任务执行:
The default scheduler is the celery.beat.PersistentScheduler
, that simply keeps track of the last run times in a local shelve database file.
为了能够动态管理周期性任务并在运行时重新安排 celerybeat:
There’s also the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.
任务将保存在 django 数据库中,调度程序可以在数据库级别的任务模型中更新。每当您更新定期任务时,此任务中的计数器 table 将递增,并告诉 celery beat 服务从数据库重新加载计划。
您可能的解决方案如下:
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))
views.py
def update_task_view(request, id)
task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
task.args=json.dumps([id])
task.save()
我正在使用 Celery 4.0.1
和 Django 1.10
,我在安排任务时遇到了问题(运行 一个任务工作正常)。这是芹菜配置:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2
app.conf.CELERY_QUEUES = (
Queue('myapp.celery_default'),
Queue('myapp.queue1'),
Queue('myapp.queue2'),
Queue('myapp.queue3'),
)
然后在 tasks.py 我有:
@app.task(queue='myapp.queue1')
def my_task(some_id):
print("Doing something with", some_id)
在views.py我想安排这个任务:
def my_view(request, id):
app.add_periodic_task(10, my_task.s(id))
然后我执行命令:
sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app
但任务从未安排。我在日志中没有看到任何内容。该任务正在运行,因为如果在我看来我这样做:
def my_view(request, id):
my_task.delay(id)
任务执行完毕
如果在我的配置文件中,如果我手动安排任务,就像这样它会起作用:
app.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.my_task',
'schedule': 10.0,
'args': (66,)
},
}
我无法动态安排任务。有什么想法吗?
编辑:(13/01/2018)
The latest release 4.1.0 have addressed the subject in this ticket #3958 and has been merged
实际上你不能在视图级别不定义周期性任务,因为节拍计划设置将首先加载并且无法在运行时重新计划:
The
add_periodic_task()
function will add the entry to the beat_schedule setting behind the scenes, and the same setting can also can be used to set up periodic tasks manually:app.conf.CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'tasks.my_task', 'schedule': 10.0, 'args': (66,) }, }
这意味着如果你想使用 add_periodic_task()
它应该被包裹在 celery 应用程序级别的 on_after_configure
处理程序中并且运行时的任何修改都不会生效:
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10, my_task.s(66))
如 doc 中所述,常规的 celerybeat 只是跟踪任务执行:
The default scheduler is the
celery.beat.PersistentScheduler
, that simply keeps track of the last run times in a local shelve database file.
为了能够动态管理周期性任务并在运行时重新安排 celerybeat:
There’s also the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.
任务将保存在 django 数据库中,调度程序可以在数据库级别的任务模型中更新。每当您更新定期任务时,此任务中的计数器 table 将递增,并告诉 celery beat 服务从数据库重新加载计划。
您可能的解决方案如下:
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))
views.py
def update_task_view(request, id)
task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
task.args=json.dumps([id])
task.save()