为什么这种构建一组链会导致 Celery 出现异常?
Why is this construction of a group of chains causing an exception in Celery?
我提前为没有提供 MVCE 道歉——不幸的是,由于这个问题的性质,它不适合用最少的例子。尽管如此,我认为如果没有 MVCE,它仍然会很负责任。
我有一个任务列表,客户可以从中 select 在 Flask 中创建一个任务子集。我这样创建任务:
current_app.logger.info("Creating list of chained tasks..")
chains = [functools.reduce(
lambda x, y: x | y.s(foo, bar), remaining_tasks, first_task.s(foo, bar)
) for foo in foos]
所有任务都有相似的函数签名,类似于
@celery.task
def my_task(baz, foo, bar):
# ...
return baz
并且我尝试以下列方式执行该组:
current_app.logger.info("Created a group of chained tasks..")
g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")
我发现调用 apply_async
时会引发两个异常:
Traceback (most recent call last):
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 209, in __getitem__
return self.__consumed[index]
IndexError: list index out of range
和
File "/Users/erip/Code/whatever.py", line 101, in blahblah
res = g.apply_async(args=(baz,), queue="default")
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 977, in apply_async
app = self.app
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 1144, in app
app = self.tasks[0].app
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 213, in __getitem__
self.__consumed.append(next(self.__it))
TypeError: 'Signature' object is not an iterator
docs表明我构建的链是有效的,所以我不明白为什么异步应用程序会导致问题。
我的目标是创建一组异步应用的 len(foos)
链。我发现这种行为仅在 len(foos) == 1
.
时发生
有没有人运行遇到过这个问题?
我遇到过类似的问题,celery docs有如下注释:
If only one argument is passed, and that argument is an iterable
then that'll be used as the list of tasks instead: this
allows us to use group
with generator expressions.
看一下Groupclass的构造函数。如果我们只传递一个签名来初始化一个 group 对象,这个签名将被视为一个生成器。
def __init__(self, *tasks, **options):
if len(tasks) == 1:
tasks = tasks[0]
if isinstance(tasks, group):
tasks = tasks.tasks
if not isinstance(tasks, _regen):
tasks = regen(tasks)
Signature.__init__(
self, 'celery.group', (), {'tasks': tasks}, **options
)
self.subtask_type = 'group'
对于您的情况,您可以简单地按以下方式执行一组任务:
current_app.logger.info("Created a group of chained tasks..")
if len(chains) == 1:
g = group(chains)
else:
g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")
我提前为没有提供 MVCE 道歉——不幸的是,由于这个问题的性质,它不适合用最少的例子。尽管如此,我认为如果没有 MVCE,它仍然会很负责任。
我有一个任务列表,客户可以从中 select 在 Flask 中创建一个任务子集。我这样创建任务:
current_app.logger.info("Creating list of chained tasks..")
chains = [functools.reduce(
lambda x, y: x | y.s(foo, bar), remaining_tasks, first_task.s(foo, bar)
) for foo in foos]
所有任务都有相似的函数签名,类似于
@celery.task
def my_task(baz, foo, bar):
# ...
return baz
并且我尝试以下列方式执行该组:
current_app.logger.info("Created a group of chained tasks..")
g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")
我发现调用 apply_async
时会引发两个异常:
Traceback (most recent call last):
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 209, in __getitem__
return self.__consumed[index]
IndexError: list index out of range
和
File "/Users/erip/Code/whatever.py", line 101, in blahblah
res = g.apply_async(args=(baz,), queue="default")
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 977, in apply_async
app = self.app
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 1144, in app
app = self.tasks[0].app
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 213, in __getitem__
self.__consumed.append(next(self.__it))
TypeError: 'Signature' object is not an iterator
docs表明我构建的链是有效的,所以我不明白为什么异步应用程序会导致问题。
我的目标是创建一组异步应用的 len(foos)
链。我发现这种行为仅在 len(foos) == 1
.
有没有人运行遇到过这个问题?
我遇到过类似的问题,celery docs有如下注释:
If only one argument is passed, and that argument is an iterable
then that'll be used as the list of tasks instead: this
allows us to usegroup
with generator expressions.
看一下Groupclass的构造函数。如果我们只传递一个签名来初始化一个 group 对象,这个签名将被视为一个生成器。
def __init__(self, *tasks, **options):
if len(tasks) == 1:
tasks = tasks[0]
if isinstance(tasks, group):
tasks = tasks.tasks
if not isinstance(tasks, _regen):
tasks = regen(tasks)
Signature.__init__(
self, 'celery.group', (), {'tasks': tasks}, **options
)
self.subtask_type = 'group'
对于您的情况,您可以简单地按以下方式执行一组任务:
current_app.logger.info("Created a group of chained tasks..")
if len(chains) == 1:
g = group(chains)
else:
g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")