为什么这种构建一组链会导致 Celery 出现异常?

Why is this construction of a group of chains causing an exception in Celery?

我提前为没有提供 MVCE 道歉——不幸的是,由于这个问题的性质,它不适合用最少的例子。尽管如此,我认为如果没有 MVCE,它仍然会很负责任。

我有一个任务列表,客户可以从中 select 在 Flask 中创建一个任务子集。我这样创建任务:

current_app.logger.info("Creating list of chained tasks..")
chains = [functools.reduce(
    lambda x, y: x | y.s(foo, bar), remaining_tasks, first_task.s(foo, bar)
) for foo in foos]

所有任务都有相似的函数签名,类似于

@celery.task
def my_task(baz, foo, bar):
    # ...
    return baz

并且我尝试以下列方式执行该组:

current_app.logger.info("Created a group of chained tasks..")
g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")

我发现调用 apply_async 时会引发两个异常:

Traceback (most recent call last):
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 209, in __getitem__
    return self.__consumed[index]
IndexError: list index out of range

File "/Users/erip/Code/whatever.py", line 101, in blahblah
    res = g.apply_async(args=(baz,), queue="default")
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 977, in apply_async
    app = self.app
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 1144, in app
    app = self.tasks[0].app
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 213, in __getitem__
    self.__consumed.append(next(self.__it))
TypeError: 'Signature' object is not an iterator

docs表明我构建的链是有效的,所以我不明白为什么异步应用程序会导致问题。

我的目标是创建一组异步应用的 len(foos) 链。我发现这种行为仅在 len(foos) == 1.

时发生

有没有人运行遇到过这个问题?

我遇到过类似的问题,celery docs有如下注释:

If only one argument is passed, and that argument is an iterable
then that'll be used as the list of tasks instead: this
allows us to use group with generator expressions.

看一下Groupclass的构造函数。如果我们只传递一个签名来初始化一个 group 对象,这个签名将被视为一个生成器。

def __init__(self, *tasks, **options):                                       
    if len(tasks) == 1:                                                      
        tasks = tasks[0]                                                     
        if isinstance(tasks, group):                                         
            tasks = tasks.tasks                                              
        if not isinstance(tasks, _regen):                                    
            tasks = regen(tasks)                                             
    Signature.__init__(                                                      
        self, 'celery.group', (), {'tasks': tasks}, **options                
    )                                                                        
    self.subtask_type = 'group'  

对于您的情况,您可以简单地按以下方式执行一组任务:

current_app.logger.info("Created a group of chained tasks..")
if len(chains) == 1:
    g = group(chains)
else:
    g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")