Django celery - 完成后重新启动任务
Django celery - Restart the task when it's completed
@app.task
def Task1():
print("this is task 1")
return "Task-1 Done"
举个例子我想任务完成后重启
手动连续调用
如果要多次调用任务,并且每次都使用相同的任务id,可以使用apply_async的task_id
参数。
请注意,这不适用于 delay,如文档所述:
delay(*args, **kwargs)
Star argument version of apply_async().
Does not support the extra options enabled by apply_async().
@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
>>> from tasks import Task1
>>> result = Task1.apply_async()
>>> result
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> result.id
'ba488582-9d7d-4bda-a19d-a2b0bf9b503f'
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>>
[2021-08-12 08:24:31,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:24:31,538: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:24:31,539: WARNING/ForkPoolWorker-4]
[2021-08-12 08:24:31,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.00041928999962692615s: None
[2021-08-12 08:25:00,608: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:00,609: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0002528750001147273s: None
[2021-08-12 08:25:06,137: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:06,139: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0003467680007815943s: None
[2021-08-12 08:25:10,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:10,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0006299719998423825s: None
- 所有执行的任务id都是相同的,这里是
ba488582-9d7d-4bda-a19d-a2b0bf9b503f
(在AsyncResult
中也可以看到)
自动连续调用
如果您想继续重新启动任务,这里有一些选项。下面的所有选项都是无限递归的。您可能希望在任务中设置一些条件来确定无限循环何时终止,例如向任务添加输入并在执行必须已经停止时将其用作基础。
选项 1: 在同一任务中调用任务本身 asynchronously。这有点像递归。这将使用与在手动连续调用(见上文)中完成的相同的任务 ID。
@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
Task1.apply_async(task_id=self.request.id)
方案二:触发celery提供的retry机制。这将在 link:
中记录的同一队列上使用相同的任务 ID
When you call retry it’ll send a new message, using the same task-id,
and it’ll take care to make sure the message is delivered to the same
queue as the originating task.
我们可以通过 displaying the task id 通过 self.request.id
验证这一点。
@app.task(
bind=True,
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise self.retry()
选项 3: 仅重试 specific scenario(此处为 RestartTaskNeeded
)。与选项 2 相同,这也会在同一队列上使用相同的任务 ID。
class RestartTaskNeeded(Exception):
pass
@app.task(
bind=True,
autoretry_for=(RestartTaskNeeded,),
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise RestartTaskNeeded
输出:
>>> from tasks import Task1
>>> Task1.apply_async()
<AsyncResult: 999e9de0-292f-412d-a9a8-b5c0013bdab3>
[2021-08-12 07:51:29,783: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:31,796: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:31,797: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:31,820: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:31,820: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:34,028: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:34,028: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:38,038: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:38,039: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:40,041: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:40,042: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:42,044: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:42,045: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:42,049: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:42,051: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:44,050: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:44,051: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:46,057: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:46,058: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:48,682: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:48,683: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:48,687: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:48,688: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
... and so on ...
- 任务总是(自动)在完成后“重新启动”
- 所有执行的任务id都是相同的,这里是
999e9de0-292f-412d-a9a8-b5c0013bdab3
(在AsyncResult
中也可以看到)
进一步阅读
根据您对此问题的确切目的,您可能还对 Celery canvas 例如任务链(在完成另一个任务后调用一个任务,这些任务可能不同也可能相同)。
@app.task
def Task1():
print("this is task 1")
return "Task-1 Done"
举个例子我想任务完成后重启
手动连续调用
如果要多次调用任务,并且每次都使用相同的任务id,可以使用apply_async的task_id
参数。
请注意,这不适用于 delay,如文档所述:
delay(*args, **kwargs)
Star argument version of apply_async().
Does not support the extra options enabled by apply_async().
@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
>>> from tasks import Task1
>>> result = Task1.apply_async()
>>> result
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> result.id
'ba488582-9d7d-4bda-a19d-a2b0bf9b503f'
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>>
[2021-08-12 08:24:31,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:24:31,538: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:24:31,539: WARNING/ForkPoolWorker-4]
[2021-08-12 08:24:31,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.00041928999962692615s: None
[2021-08-12 08:25:00,608: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:00,609: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0002528750001147273s: None
[2021-08-12 08:25:06,137: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:06,139: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0003467680007815943s: None
[2021-08-12 08:25:10,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:10,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0006299719998423825s: None
- 所有执行的任务id都是相同的,这里是
ba488582-9d7d-4bda-a19d-a2b0bf9b503f
(在AsyncResult
中也可以看到)
自动连续调用
如果您想继续重新启动任务,这里有一些选项。下面的所有选项都是无限递归的。您可能希望在任务中设置一些条件来确定无限循环何时终止,例如向任务添加输入并在执行必须已经停止时将其用作基础。
选项 1: 在同一任务中调用任务本身 asynchronously。这有点像递归。这将使用与在手动连续调用(见上文)中完成的相同的任务 ID。
@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
Task1.apply_async(task_id=self.request.id)
方案二:触发celery提供的retry机制。这将在 link:
中记录的同一队列上使用相同的任务 IDWhen you call retry it’ll send a new message, using the same task-id, and it’ll take care to make sure the message is delivered to the same queue as the originating task.
我们可以通过 displaying the task id 通过 self.request.id
验证这一点。
@app.task(
bind=True,
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise self.retry()
选项 3: 仅重试 specific scenario(此处为 RestartTaskNeeded
)。与选项 2 相同,这也会在同一队列上使用相同的任务 ID。
class RestartTaskNeeded(Exception):
pass
@app.task(
bind=True,
autoretry_for=(RestartTaskNeeded,),
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise RestartTaskNeeded
输出:
>>> from tasks import Task1
>>> Task1.apply_async()
<AsyncResult: 999e9de0-292f-412d-a9a8-b5c0013bdab3>
[2021-08-12 07:51:29,783: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:31,796: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:31,797: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:31,820: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:31,820: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:34,028: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:34,028: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:38,038: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:38,039: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:40,041: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:40,042: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:42,044: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:42,045: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:42,049: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:42,051: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:44,050: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:44,051: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:46,057: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:46,058: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:48,682: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:48,683: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:48,687: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:48,688: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
... and so on ...
- 任务总是(自动)在完成后“重新启动”
- 所有执行的任务id都是相同的,这里是
999e9de0-292f-412d-a9a8-b5c0013bdab3
(在AsyncResult
中也可以看到)
进一步阅读
根据您对此问题的确切目的,您可能还对 Celery canvas 例如任务链(在完成另一个任务后调用一个任务,这些任务可能不同也可能相同)。