如何有条件地执行 celery 任务 python
how to execute celery tasks conditionally python
我是芹菜新手。我有一个芹菜任务,需要在满足条件时执行。否则几分钟后重试。从下面的代码中,我被困在如何在 else 条件下重试相同的任务?感谢您的帮助。
@app.task(bind=True,soft_time_limit=4 * 3600)
def task_message_queue(id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"):
ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest})
num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0]
if num_files < 20:
#Move files from src to destination
else:
#wait for 2 minutes and retry the task
你必须调用 retry
让 celery 重试任务,你可以设置倒计时,这样 celery 会等待那么多时间并重试任务。下面是从官方celery docs借来的代码。修改 @task
装饰器根据您的需要也 self.retry
from celery.task import task
@app.task(bind=True,soft_time_limit=4 * 3600)
def task_message_queue(self, id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"):
ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest})
num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0]
try:
if num_files < 20:
#Move files from src to destination
else:
raise SOME_EXCEPTION
#wait for 2 minutes and retry the task
except SOME_EXCEPTION as exc:
self.retry(exc=exc, countdown=TIME_TO_WAIT_BEFORE_RETRY)
除了重试之外,您还可以再次触发任务。
from celery.task import task
@app.task(bind=True,soft_time_limit=4 * 3600)
def task_message_queue(self, id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"):
ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest})
num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0]
if num_files < 20:
#Move files from src to destination
else:
# Trigger the task again in 120 seconds.
task_message_queue.apply_async(countdown=120)
我是芹菜新手。我有一个芹菜任务,需要在满足条件时执行。否则几分钟后重试。从下面的代码中,我被困在如何在 else 条件下重试相同的任务?感谢您的帮助。
@app.task(bind=True,soft_time_limit=4 * 3600)
def task_message_queue(id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"):
ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest})
num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0]
if num_files < 20:
#Move files from src to destination
else:
#wait for 2 minutes and retry the task
你必须调用 retry
让 celery 重试任务,你可以设置倒计时,这样 celery 会等待那么多时间并重试任务。下面是从官方celery docs借来的代码。修改 @task
装饰器根据您的需要也 self.retry
from celery.task import task
@app.task(bind=True,soft_time_limit=4 * 3600)
def task_message_queue(self, id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"):
ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest})
num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0]
try:
if num_files < 20:
#Move files from src to destination
else:
raise SOME_EXCEPTION
#wait for 2 minutes and retry the task
except SOME_EXCEPTION as exc:
self.retry(exc=exc, countdown=TIME_TO_WAIT_BEFORE_RETRY)
除了重试之外,您还可以再次触发任务。
from celery.task import task
@app.task(bind=True,soft_time_limit=4 * 3600)
def task_message_queue(self, id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"):
ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest})
num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0]
if num_files < 20:
#Move files from src to destination
else:
# Trigger the task again in 120 seconds.
task_message_queue.apply_async(countdown=120)