我怎样才能防止同一个任务在另一个任务上执行?
How can I prevent the same task executed one OVER another?
我有一项任务是从 API 中提取数据,然后对数据库 (SQLite) 执行系统自动化(删除、重命名等)。我将任务设置为每 5 分钟执行一次。但是,有时任务需要超过 5 分钟才能完成,因此两个任务 运行 并行。这不好,因为 SQLite 数据库在 0 分钟内锁定到任务。
我怎么也有,
- 除非上一个任务已经完成或者
否则不会执行该任务
- 第二个任务在0分钟任务完成后直接排队执行?
我试过使用全局布尔值来防止任务在 运行ning 时执行,就像这样。
automate_is_running = False
@periodic_task(run_every=timedelta(minutes=5))
def automate():
if not automate_is_running:
automate_is_running = True
automate_all()
automate_is_running = False
但是那个returnsUnboundLocalError: local variable 'automate_is_running' referenced before assignment
错误。我该怎么办?
首先,您必须在函数中使用 global
。但这仍然不能如你所愿,因为每个 Celery worker 都是它自己的进程,进程之间不共享数据。
您需要使用某种外部互斥体,例如磁盘上的文件或数据库或缓存中的条目。有一个使用 memcached 的示例 in the Celery cookbook。
与其定期执行,不如在开始时安排一次,并始终在任务结束时安排下一次执行。还要确保重试失败的任务。
错误 TypeError: automate() missing 1 required positional argument: 'self'
是因为 Celery task
装饰器创建了一个任务对象,因此您需要接受它的实例作为第一个参数 self
.
下面的代码将 运行 任务尽快完成一次,然后总是在任务成功完成后 300 秒。它也会在失败时尽快重试。
要立即再次触发任务,请将 self.apply_async(countdown=300)
替换为 self.delay()
或传递 countdown=0
.
@task
def automate(self):
try:
automate_all()
except Exception as exc:
raise self.retry(exc=exc)
else:
self.apply_async(countdown=300)
automate.delay()
尝试使用特定异常 class 而不是 Exception
。我不知道你的代码做了什么以及你期望的异常。
您可以使用 celery beat 设置调度程序,并在第一个任务结束时调用第二个任务。
celery.py(芹菜 2.3)
from django.conf import settings
from celery.schedules import crontab
settings.CELERYBEAT_SCHEDULE = {
'runs-every-5-minutes' : {
'task': 'automate',
'schedule': crontab(minute='*/5'),
'args' : (),
},
}
tasks.py:
from celery import task
@task(name='automate')
def automate():
automate_all()
run_second_task()
文档:
芹菜 2.3:
http://docs.celeryproject.org/en/v2.3.3/userguide/periodic-tasks.html#crontab-schedules
芹菜4.1
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules
我有一项任务是从 API 中提取数据,然后对数据库 (SQLite) 执行系统自动化(删除、重命名等)。我将任务设置为每 5 分钟执行一次。但是,有时任务需要超过 5 分钟才能完成,因此两个任务 运行 并行。这不好,因为 SQLite 数据库在 0 分钟内锁定到任务。
我怎么也有,
- 除非上一个任务已经完成或者 否则不会执行该任务
- 第二个任务在0分钟任务完成后直接排队执行?
我试过使用全局布尔值来防止任务在 运行ning 时执行,就像这样。
automate_is_running = False
@periodic_task(run_every=timedelta(minutes=5))
def automate():
if not automate_is_running:
automate_is_running = True
automate_all()
automate_is_running = False
但是那个returnsUnboundLocalError: local variable 'automate_is_running' referenced before assignment
错误。我该怎么办?
首先,您必须在函数中使用 global
。但这仍然不能如你所愿,因为每个 Celery worker 都是它自己的进程,进程之间不共享数据。
您需要使用某种外部互斥体,例如磁盘上的文件或数据库或缓存中的条目。有一个使用 memcached 的示例 in the Celery cookbook。
与其定期执行,不如在开始时安排一次,并始终在任务结束时安排下一次执行。还要确保重试失败的任务。
错误 TypeError: automate() missing 1 required positional argument: 'self'
是因为 Celery task
装饰器创建了一个任务对象,因此您需要接受它的实例作为第一个参数 self
.
下面的代码将 运行 任务尽快完成一次,然后总是在任务成功完成后 300 秒。它也会在失败时尽快重试。
要立即再次触发任务,请将 self.apply_async(countdown=300)
替换为 self.delay()
或传递 countdown=0
.
@task
def automate(self):
try:
automate_all()
except Exception as exc:
raise self.retry(exc=exc)
else:
self.apply_async(countdown=300)
automate.delay()
尝试使用特定异常 class 而不是 Exception
。我不知道你的代码做了什么以及你期望的异常。
您可以使用 celery beat 设置调度程序,并在第一个任务结束时调用第二个任务。
celery.py(芹菜 2.3)
from django.conf import settings
from celery.schedules import crontab
settings.CELERYBEAT_SCHEDULE = {
'runs-every-5-minutes' : {
'task': 'automate',
'schedule': crontab(minute='*/5'),
'args' : (),
},
}
tasks.py:
from celery import task
@task(name='automate')
def automate():
automate_all()
run_second_task()
文档:
芹菜 2.3: http://docs.celeryproject.org/en/v2.3.3/userguide/periodic-tasks.html#crontab-schedules
芹菜4.1 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules