Django celery - 完成后重新启动任务

Django celery - Restart the task when it's completed


    @app.task
    def Task1():
        print("this is task 1")
        return  "Task-1 Done"

举个例子我想任务完成后重启

手动连续调用

如果要多次调用任务,并且每次都使用相同的任务id,可以使用apply_asynctask_id参数。

请注意,这不适用于 delay,如文档所述:

delay(*args, **kwargs)

  • Star argument version of apply_async().

  • Does not support the extra options enabled by apply_async().

@app.task(bind=True)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
>>> from tasks import Task1
>>> result = Task1.apply_async()
>>> result
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> result.id
'ba488582-9d7d-4bda-a19d-a2b0bf9b503f'
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> 
[2021-08-12 08:24:31,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:24:31,538: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:24:31,539: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:24:31,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.00041928999962692615s: None
[2021-08-12 08:25:00,608: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:25:00,609: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0002528750001147273s: None
[2021-08-12 08:25:06,137: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:25:06,139: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0003467680007815943s: None
[2021-08-12 08:25:10,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:25:10,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0006299719998423825s: None
  • 所有执行的任务id都是相同的,这里是ba488582-9d7d-4bda-a19d-a2b0bf9b503f(在AsyncResult中也可以看到)

自动连续调用

如果您想继续重新启动任务,这里有一些选项。下面的所有选项都是无限递归的。您可能希望在任务中设置一些条件来确定无限循环何时终止,例如向任务添加输入并在执行必须已经停止时将其用作基础。

选项 1: 在同一任务中调用任务本身 asynchronously。这有点像递归。这将使用与在手动连续调用(见上文)中完成的相同的任务 ID。

@app.task(bind=True)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
    time.sleep(2)
    print("re-trigger task 1")
    Task1.apply_async(task_id=self.request.id)

方案二:触发celery提供的retry机制。这将在 link:

中记录的同一队列上使用相同的任务 ID

When you call retry it’ll send a new message, using the same task-id, and it’ll take care to make sure the message is delivered to the same queue as the originating task.

我们可以通过 displaying the task id 通过 self.request.id 验证这一点。

@app.task(
    bind=True,
    default_retry_delay=0.1,
    retry_backoff=False,
    max_retries=None,
)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
    time.sleep(2)
    print("re-trigger task 1")
    raise self.retry()

选项 3: 仅重试 specific scenario(此处为 RestartTaskNeeded)。与选项 2 相同,这也会在同一队列上使用相同的任务 ID。

class RestartTaskNeeded(Exception):
    pass


@app.task(
    bind=True,
    autoretry_for=(RestartTaskNeeded,),
    default_retry_delay=0.1,
    retry_backoff=False,
    max_retries=None,
)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
    time.sleep(2)
    print("re-trigger task 1")
    raise RestartTaskNeeded

输出:

>>> from tasks import Task1
>>> Task1.apply_async()
<AsyncResult: 999e9de0-292f-412d-a9a8-b5c0013bdab3>
[2021-08-12 07:51:29,783: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:31,796: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:31,797: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:31,820: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:31,820: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:34,028: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:34,028: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:38,038: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:38,039: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:40,041: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:40,042: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:42,044: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:42,045: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:42,049: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:42,051: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:44,050: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:44,051: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:46,057: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:46,058: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:48,682: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:48,683: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:48,687: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:48,688: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received

... and so on ...
  • 任务总是(自动)在完成后“重新启动”
  • 所有执行的任务id都是相同的,这里是999e9de0-292f-412d-a9a8-b5c0013bdab3(在AsyncResult中也可以看到)

进一步阅读

根据您对此问题的确切目的,您可能还对 Celery canvas 例如任务链(在完成另一个任务后调用一个任务,这些任务可能不同也可能相同)。