任务签名中不尊重 celery eta 参数

celery eta argument is not respected in task signature

我有几个任务需要依次执行。我需要在执行 task_a 和 task_b 之间设置动态超时。

我想将 eta[​​=17=] 参数传递给 task_b,而不是 task_a 内部的 time.sleep(破坏异步方法)或调用 task_b inside task_a(代码重用可能存在问题)。

task_a = task_a.s()
task_b = task_b.signature(
    args=(arg_a,),
    starkwargs={
        'eta': datetime.now() + timedelta(minutes=dynamic_coefficient)  # not working, task_b is executed immediately. When I use 'options' keyword, I get "AttributeError: 'unicode' object has no attribute 'isoformat'"
    }
)
(task_a | task_b).apply_async(
    eta=datetime.now() + timedelta(minutes=1)  # this does work, but useless for me as delays task_a, not task_b
)

作为documented,预计到达时间是:

a specific date and time that is the earliest time at which your task will be executed.

eta must be a datetime object, specifying an exact date and time (including millisecond precision, and timezone information)

所以它实际上并没有将任务的执行延迟n seconds/minutes/hours/etc,它所做的只是在目标时间专门执行任务.我找到的解决方案是使用 countdown 而不是 eta.

tasks.py

from datetime import datetime, timezone
import time

from celery import Celery

app = Celery('tasks')


@app.task(bind=True)
def add(self, x, y):
    print(f"{datetime.now(timezone.utc)} Called add for {self.request.eta}")
    return x + y


@app.task(bind=True)
def mul(self, x, y):
    print(f"{datetime.now(timezone.utc)} Called mul for {self.request.eta}")
    return x * y

问题:使用 ETA 延迟执行

target_eta = datetime.now(timezone.utc) + timedelta(seconds=3)  # ETA is 3 seconds from now. To emphasize, the ETA is with respect to NOW, not with respect to the end of the 1st task's execution.

task_a = add.signature((1, 2))
task_b = mul.signature((3,), eta=target_eta)

time.sleep(5)  # Now, we are already past the indicated ETA. Thus it would now be executed as soon as possible, which we don't want to but it was how we configured it.

(task_a | task_b).apply_async()
[2021-08-17 21:33:24,831: INFO/MainProcess] Task tasks.add[0058965a-f9da-47dc-8f40-e111deb9bfbb] received
[2021-08-17 21:33:24,832: WARNING/ForkPoolWorker-4] 2021-08-17 13:33:24.832190+00:00 Called add for None
[2021-08-17 21:33:24,832: WARNING/ForkPoolWorker-4] 

[2021-08-17 21:33:24,833: INFO/ForkPoolWorker-4] Task tasks.add[0058965a-f9da-47dc-8f40-e111deb9bfbb] succeeded in 0.0014381749997482984s: 3
[2021-08-17 21:33:24,834: INFO/MainProcess] Task tasks.mul[6df9a54f-04f3-4178-8d23-eb2bd9b9c47b] received
[2021-08-17 21:33:24,930: WARNING/ForkPoolWorker-4] 2021-08-17 13:33:24.930689+00:00 Called mul for 2021-08-17T13:33:22.814930Z
[2021-08-17 21:33:24,930: WARNING/ForkPoolWorker-4] 

[2021-08-17 21:33:24,931: INFO/ForkPoolWorker-4] Task tasks.mul[6df9a54f-04f3-4178-8d23-eb2bd9b9c47b] succeeded in 0.0004077819999110943s: 9
  • 如您所见,立即调用第二个任务实际上并没有错,因为它的 ETA 是 2021-08-17T13:33:22.814930Z,而那一刻的时间已经过了 2021-08-17 21:33:24,930
  • 因此,它尊重预计到达时间,只是我们没有看到假定的 3 秒延迟,因为我们已经超过了它。
  • 如果您想查看第一个和第二个任务之间的延迟,那么您必须将 ETA 设置为相对于第一个任务执行结束的假设时间,而不是相对于当前任务时间.

解决方案:使用倒计时延迟执行

target_countdown = 3

task_a = add.signature((1, 2))
task_b = mul.signature((3,), countdown=target_countdown)

time.sleep(5)  # Doesn't matter, the countdown applies with respect to the end of the 1st task in chain

(task_a | task_b).apply_async()
[2021-08-17 21:38:17,659: INFO/MainProcess] Task tasks.add[5712dee5-3c07-4e85-87bd-154b8db41784] received
[2021-08-17 21:38:17,660: WARNING/ForkPoolWorker-4] 2021-08-17 13:38:17.660491+00:00 Called add for None
[2021-08-17 21:38:17,660: WARNING/ForkPoolWorker-4] 

[2021-08-17 21:38:17,662: INFO/ForkPoolWorker-4] Task tasks.add[5712dee5-3c07-4e85-87bd-154b8db41784] succeeded in 0.0016173230001186312s: 3
[2021-08-17 21:38:17,663: INFO/MainProcess] Task tasks.mul[12cc2749-8dbd-46c4-9e91-e0007fffd84e] received
[2021-08-17 21:38:20,798: WARNING/ForkPoolWorker-4] 2021-08-17 13:38:20.798231+00:00 Called mul for 2021-08-17T13:38:20.661087+00:00
[2021-08-17 21:38:20,798: WARNING/ForkPoolWorker-4] 

[2021-08-17 21:38:20,798: INFO/ForkPoolWorker-4] Task tasks.mul[12cc2749-8dbd-46c4-9e91-e0007fffd84e] succeeded in 0.0005633190003209165s: 9
  • 现在正如预期的那样,在第一个和第二个任务的执行之间已经有 3 秒的延迟。