设置和调用一组带有单独倒计时的芹菜任务

Setting up and calling a group of celery tasks with individual countdown

使用:Django==2.2.24Python=3.6celery==4.3.0

这是我目前正在做的事情:

from celery import group

the_group_of_tasks = group(
  some_task.s(an_object.the_data_dict)
  for an_object in AnObject.objects.all()
)
the_group_of_tasks.delay()

我想做的事情:
group 文档:celery docs link
我想在某个时间范围内分散 the_group_of_taskssome_task 调用。
最好是我可以使用 countdown 功能,并将任务分散在可变的秒数(例如一个小时,3600 秒)内。
分配将完成为 0 到 3600 之间的随机秒整数,想象一下一旦我有了范围就可以很容易地计算它。

我想我可以添加 countdown arg,在我的范围内使用随机数生成器,这样它将被“打包”并准备好在 group 中与个人一起执行任务准备?

some_task.s(an_object.the_data_dict, countdown=some_generator_call)

这样行吗?

In the docs,看起来 signature 应该支持 countdown:

>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)

谢谢!

编辑:
我尝试添加倒计时,如下所示:

the_group_of_tasks = group(
  some_task.s(an_object.the_data_dict, countdown=10)
  for an_object in AnObject.objects.all()
)
the_group_of_tasks.delay()

但是看到这个错误:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib64/python3.6/site-packages/newrelic/hooks/application_celery.py", line 85, in wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 648, in __protected_call__
    return self.run(*args, **kwargs)
TypeError: run() got an unexpected keyword argument 'countdown'

所以问题是 countdown 被假定为任务的参数,而它应该是 apply_async/delay 调用的参数。

这解决了问题:

the_group_of_tasks = group(
  some_task.signature((an_object.the_data_dict), countdown=10)
  for an_object in AnObject.objects.all()
)
the_group_of_tasks.delay()
  1. .s() 已更改为 .signature
  2. 参数被分隔开,不再有歧义

谢谢!