任务签名中不尊重 celery eta 参数
celery eta argument is not respected in task signature
我有几个任务需要依次执行。我需要在执行 task_a 和 task_b 之间设置动态超时。
我想将 eta[=17=] 参数传递给 task_b,而不是 task_a 内部的 time.sleep
(破坏异步方法)或调用 task_b inside task_a(代码重用可能存在问题)。
task_a = task_a.s()
task_b = task_b.signature(
args=(arg_a,),
starkwargs={
'eta': datetime.now() + timedelta(minutes=dynamic_coefficient) # not working, task_b is executed immediately. When I use 'options' keyword, I get "AttributeError: 'unicode' object has no attribute 'isoformat'"
}
)
(task_a | task_b).apply_async(
eta=datetime.now() + timedelta(minutes=1) # this does work, but useless for me as delays task_a, not task_b
)
作为documented,预计到达时间是:
a specific date and time that is the earliest time at which your task
will be executed.
eta must be a datetime object, specifying an exact date and time (including millisecond precision, and timezone information)
所以它实际上并没有将任务的执行延迟n seconds/minutes/hours/etc,它所做的只是在目标时间专门执行任务.我找到的解决方案是使用 countdown
而不是 eta
.
tasks.py
from datetime import datetime, timezone
import time
from celery import Celery
app = Celery('tasks')
@app.task(bind=True)
def add(self, x, y):
print(f"{datetime.now(timezone.utc)} Called add for {self.request.eta}")
return x + y
@app.task(bind=True)
def mul(self, x, y):
print(f"{datetime.now(timezone.utc)} Called mul for {self.request.eta}")
return x * y
问题:使用 ETA 延迟执行
target_eta = datetime.now(timezone.utc) + timedelta(seconds=3) # ETA is 3 seconds from now. To emphasize, the ETA is with respect to NOW, not with respect to the end of the 1st task's execution.
task_a = add.signature((1, 2))
task_b = mul.signature((3,), eta=target_eta)
time.sleep(5) # Now, we are already past the indicated ETA. Thus it would now be executed as soon as possible, which we don't want to but it was how we configured it.
(task_a | task_b).apply_async()
[2021-08-17 21:33:24,831: INFO/MainProcess] Task tasks.add[0058965a-f9da-47dc-8f40-e111deb9bfbb] received
[2021-08-17 21:33:24,832: WARNING/ForkPoolWorker-4] 2021-08-17 13:33:24.832190+00:00 Called add for None
[2021-08-17 21:33:24,832: WARNING/ForkPoolWorker-4]
[2021-08-17 21:33:24,833: INFO/ForkPoolWorker-4] Task tasks.add[0058965a-f9da-47dc-8f40-e111deb9bfbb] succeeded in 0.0014381749997482984s: 3
[2021-08-17 21:33:24,834: INFO/MainProcess] Task tasks.mul[6df9a54f-04f3-4178-8d23-eb2bd9b9c47b] received
[2021-08-17 21:33:24,930: WARNING/ForkPoolWorker-4] 2021-08-17 13:33:24.930689+00:00 Called mul for 2021-08-17T13:33:22.814930Z
[2021-08-17 21:33:24,930: WARNING/ForkPoolWorker-4]
[2021-08-17 21:33:24,931: INFO/ForkPoolWorker-4] Task tasks.mul[6df9a54f-04f3-4178-8d23-eb2bd9b9c47b] succeeded in 0.0004077819999110943s: 9
- 如您所见,立即调用第二个任务实际上并没有错,因为它的 ETA 是
2021-08-17T13:33:22.814930Z
,而那一刻的时间已经过了 2021-08-17 21:33:24,930
。
- 因此,它尊重预计到达时间,只是我们没有看到假定的 3 秒延迟,因为我们已经超过了它。
- 如果您想查看第一个和第二个任务之间的延迟,那么您必须将 ETA 设置为相对于第一个任务执行结束的假设时间,而不是相对于当前任务时间.
解决方案:使用倒计时延迟执行
target_countdown = 3
task_a = add.signature((1, 2))
task_b = mul.signature((3,), countdown=target_countdown)
time.sleep(5) # Doesn't matter, the countdown applies with respect to the end of the 1st task in chain
(task_a | task_b).apply_async()
[2021-08-17 21:38:17,659: INFO/MainProcess] Task tasks.add[5712dee5-3c07-4e85-87bd-154b8db41784] received
[2021-08-17 21:38:17,660: WARNING/ForkPoolWorker-4] 2021-08-17 13:38:17.660491+00:00 Called add for None
[2021-08-17 21:38:17,660: WARNING/ForkPoolWorker-4]
[2021-08-17 21:38:17,662: INFO/ForkPoolWorker-4] Task tasks.add[5712dee5-3c07-4e85-87bd-154b8db41784] succeeded in 0.0016173230001186312s: 3
[2021-08-17 21:38:17,663: INFO/MainProcess] Task tasks.mul[12cc2749-8dbd-46c4-9e91-e0007fffd84e] received
[2021-08-17 21:38:20,798: WARNING/ForkPoolWorker-4] 2021-08-17 13:38:20.798231+00:00 Called mul for 2021-08-17T13:38:20.661087+00:00
[2021-08-17 21:38:20,798: WARNING/ForkPoolWorker-4]
[2021-08-17 21:38:20,798: INFO/ForkPoolWorker-4] Task tasks.mul[12cc2749-8dbd-46c4-9e91-e0007fffd84e] succeeded in 0.0005633190003209165s: 9
- 现在正如预期的那样,在第一个和第二个任务的执行之间已经有 3 秒的延迟。
我有几个任务需要依次执行。我需要在执行 task_a 和 task_b 之间设置动态超时。
我想将 eta[=17=] 参数传递给 task_b,而不是 task_a 内部的 time.sleep
(破坏异步方法)或调用 task_b inside task_a(代码重用可能存在问题)。
task_a = task_a.s()
task_b = task_b.signature(
args=(arg_a,),
starkwargs={
'eta': datetime.now() + timedelta(minutes=dynamic_coefficient) # not working, task_b is executed immediately. When I use 'options' keyword, I get "AttributeError: 'unicode' object has no attribute 'isoformat'"
}
)
(task_a | task_b).apply_async(
eta=datetime.now() + timedelta(minutes=1) # this does work, but useless for me as delays task_a, not task_b
)
作为documented,预计到达时间是:
a specific date and time that is the earliest time at which your task will be executed.
eta must be a datetime object, specifying an exact date and time (including millisecond precision, and timezone information)
所以它实际上并没有将任务的执行延迟n seconds/minutes/hours/etc,它所做的只是在目标时间专门执行任务.我找到的解决方案是使用 countdown
而不是 eta
.
tasks.py
from datetime import datetime, timezone
import time
from celery import Celery
app = Celery('tasks')
@app.task(bind=True)
def add(self, x, y):
print(f"{datetime.now(timezone.utc)} Called add for {self.request.eta}")
return x + y
@app.task(bind=True)
def mul(self, x, y):
print(f"{datetime.now(timezone.utc)} Called mul for {self.request.eta}")
return x * y
问题:使用 ETA 延迟执行
target_eta = datetime.now(timezone.utc) + timedelta(seconds=3) # ETA is 3 seconds from now. To emphasize, the ETA is with respect to NOW, not with respect to the end of the 1st task's execution.
task_a = add.signature((1, 2))
task_b = mul.signature((3,), eta=target_eta)
time.sleep(5) # Now, we are already past the indicated ETA. Thus it would now be executed as soon as possible, which we don't want to but it was how we configured it.
(task_a | task_b).apply_async()
[2021-08-17 21:33:24,831: INFO/MainProcess] Task tasks.add[0058965a-f9da-47dc-8f40-e111deb9bfbb] received
[2021-08-17 21:33:24,832: WARNING/ForkPoolWorker-4] 2021-08-17 13:33:24.832190+00:00 Called add for None
[2021-08-17 21:33:24,832: WARNING/ForkPoolWorker-4]
[2021-08-17 21:33:24,833: INFO/ForkPoolWorker-4] Task tasks.add[0058965a-f9da-47dc-8f40-e111deb9bfbb] succeeded in 0.0014381749997482984s: 3
[2021-08-17 21:33:24,834: INFO/MainProcess] Task tasks.mul[6df9a54f-04f3-4178-8d23-eb2bd9b9c47b] received
[2021-08-17 21:33:24,930: WARNING/ForkPoolWorker-4] 2021-08-17 13:33:24.930689+00:00 Called mul for 2021-08-17T13:33:22.814930Z
[2021-08-17 21:33:24,930: WARNING/ForkPoolWorker-4]
[2021-08-17 21:33:24,931: INFO/ForkPoolWorker-4] Task tasks.mul[6df9a54f-04f3-4178-8d23-eb2bd9b9c47b] succeeded in 0.0004077819999110943s: 9
- 如您所见,立即调用第二个任务实际上并没有错,因为它的 ETA 是
2021-08-17T13:33:22.814930Z
,而那一刻的时间已经过了2021-08-17 21:33:24,930
。 - 因此,它尊重预计到达时间,只是我们没有看到假定的 3 秒延迟,因为我们已经超过了它。
- 如果您想查看第一个和第二个任务之间的延迟,那么您必须将 ETA 设置为相对于第一个任务执行结束的假设时间,而不是相对于当前任务时间.
解决方案:使用倒计时延迟执行
target_countdown = 3
task_a = add.signature((1, 2))
task_b = mul.signature((3,), countdown=target_countdown)
time.sleep(5) # Doesn't matter, the countdown applies with respect to the end of the 1st task in chain
(task_a | task_b).apply_async()
[2021-08-17 21:38:17,659: INFO/MainProcess] Task tasks.add[5712dee5-3c07-4e85-87bd-154b8db41784] received
[2021-08-17 21:38:17,660: WARNING/ForkPoolWorker-4] 2021-08-17 13:38:17.660491+00:00 Called add for None
[2021-08-17 21:38:17,660: WARNING/ForkPoolWorker-4]
[2021-08-17 21:38:17,662: INFO/ForkPoolWorker-4] Task tasks.add[5712dee5-3c07-4e85-87bd-154b8db41784] succeeded in 0.0016173230001186312s: 3
[2021-08-17 21:38:17,663: INFO/MainProcess] Task tasks.mul[12cc2749-8dbd-46c4-9e91-e0007fffd84e] received
[2021-08-17 21:38:20,798: WARNING/ForkPoolWorker-4] 2021-08-17 13:38:20.798231+00:00 Called mul for 2021-08-17T13:38:20.661087+00:00
[2021-08-17 21:38:20,798: WARNING/ForkPoolWorker-4]
[2021-08-17 21:38:20,798: INFO/ForkPoolWorker-4] Task tasks.mul[12cc2749-8dbd-46c4-9e91-e0007fffd84e] succeeded in 0.0005633190003209165s: 9
- 现在正如预期的那样,在第一个和第二个任务的执行之间已经有 3 秒的延迟。