任务选项在 celery chord 中停止来自 运行 的任务

Task options stopping tasks from running in celery chord

我正在尝试将现有的 celery 组调用转换为和弦以防止死锁。之前的代码有重试和过期时间。我设法在没有这些设置的情况下使和弦工作,但是当我尝试应用该设置时,我没有看到任务 运行。我在文档中没有看到任何关于在整个和弦上应用相同设置的信息。我是 运行ning celery 3.1.6.

前一个代码:

jobs = group([reset_device.s(topoid, dev_list[i], 
              waittime_list[i], skipflag) for i in range(len(dev_list))]
              ).apply_async(expires=waittime, retry=True, retry_policy={
                                                    'max_retries': 3,
                                                    'interval_start': 0.5,
                                                    'interval_step': 0.2,
                                                    'interval_max': 0.2})
results = jobs.join_native(timeout=waittime + 600, propagate=True)

工作和弦(无设置):

jobs = chord([reset_device.s(topoid, dev_list[i], 
              waittime_list[i], skipflag) for i in range(len(dev_list))])(callback)

非工作和弦#1:

jobs = chord([reset_device.s(topoid, dev_list[i], waittime_list[i],
             skipflag).set(expires=datetime.now() + timedelta(seconds=waittime)).set(retry=True).set(retry_policy=retry_policy)
              for i in range(len(dev_list))])(callback)

非工作和弦#2

jobs = chord([reset_device.subtask(args=(topoid, dev_list[i], waittime_list[i],skipflag), 
              expires=datetime.now()+timedelta(seconds=waittime), retry=True, retry_policy=retry_policy) 
             for i in range(len(dev_list))])(callback)

在#1 和#2 的情况下,chord 中的任务似乎都没有得到 运行。 我如何为和弦中调用的每个任务应用过期时间并重试?

我弄明白了,这是一个问题的混合体。

第一个问题是 expires 字段不接受整数,只接受 datetime 对象,在和弦内(也可能是组和链),尽管文档没有做任何区分。这已在更高版本中修复,我用 3.1.25 测试并能够验证修复。

第二个问题是 celery 3.1.6 不会记录和弦内的错误(我认为组和链也是如此)。这也已经修复了,我在 3.1.25 中测试并且能够看到失败。

第三个问题与错误消息有关:

[2017-08-07 18:39:56,043: ERROR/Worker-5] Chord '98246849-0d5d-4be2-85e3-3fc08e90011d' raised: TaskRevokedError(u'expired',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/builtins.py", line 90, in unlock_chord
ret = j(timeout=3.0, propagate=propagate)
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 691, in join_native
raise value
TaskRevokedError: expired

这是因为时区不对。我使用 datetime.now() 而不是 datetime.utcnow(),它解决了问题并在 3.1.6 中工作。

或者我可以设置 celery 配置 CELERY_ENABLE_UTC = False,默认设置为 True。这让我感到困惑,因为我们将配置 CELERY_TIMEZONE 设置为当地时间。 expires 字段与日期时间对象一起使用时,根据 CELERY_ENABLE_UTC 设置的值使用本地时间或 UTC。我建议保持两个配置设置相同。

有趣的是,创建了回调函数并轮询以查看和弦是否已完成,尽管和弦任务从未执行并且它永远停留在那里。我相信这个may have been fixed in celery 4.1