克隆芹菜链
Cloning a celery chain
我有一个有趣的问题,试图克隆一个 celery 链用于一个组,我的预期用例类似于 group([chain.clone(args=args) for args in it])
但是它一直抱怨没有足够的参数。
我用下面的方法分解了这个
在名为 tasks.py
的文件中
@app.task
def add(x,y):
return x+y
然后从pythonshell
>>> from tasks import add
>>> chain=add.s()|add.s(1)
>>> chain
magic_carpet.celery.add() | add(1)
>>> chain.args
()
>>> chain.delay(2,2)
<AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
>>> cloned_chain=chain.clone(args=(2,))
>>> cloned_chain.args
()
>>> cloned_chain.delay(2)
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
return self.apply_async(partial_args, partial_kwargs)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
dict(self.options, **options) if options else self.options))
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
first_task.apply_async(**options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
return _apply(args, kwargs, **options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() missing 1 required positional argument: 'y'
>>>
显然,clone
没有替换链的克隆副本中的参数,但我不确定为什么,_chain
class 实现了克隆方法记录作为
>>> from celery.canvas import _chain
>>> help(_chain.clone)
Help on function clone in module celery.canvas:
clone(self, *args, **kwargs)
Create a copy of this signature.
Arguments:
args (Tuple): Partial args to be prepended to the existing args.
kwargs (Dict): Partial kwargs to be merged with existing kwargs.
options (Dict): Partial options to be merged with
existing options.
阅读芹菜源代码,我没有发现任何明显的原因。
当前 运行 Celery 4.2.1 和 Python 3.6.6
此功能是否以某种方式损坏、不受支持,或者我是否非常迟钝并做错了什么?
原来核心问题是通过迂回的方式,clone调用构造函数链式创建新的实例。此构造函数不接受任何 args 或 kwargs 应用于链,而是将它们默认为空值,导致它们丢失。
此时我的解决方案是通过创建我自己的克隆方法来解决此问题,该方法修改链中第一个任务的参数。虽然设置 args 属性似乎也有效,但如果在 celery 中再次克隆链,则存储在其中的值将丢失。
我的克隆方法目前支持克隆 tasks
和 chains
,尽管添加对 groups
的支持将是一个微不足道的扩展
def clone_signature(sig, args=(), kwargs=(), **opts):
if sig.subtask_type and sig.subtask_type != "chain":
raise NotImplementedError(
"Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
)
clone = sig.clone()
if hasattr(clone, "tasks"):
t = clone.tasks[0]
else:
t = clone
args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
return clone
我有一个有趣的问题,试图克隆一个 celery 链用于一个组,我的预期用例类似于 group([chain.clone(args=args) for args in it])
但是它一直抱怨没有足够的参数。
我用下面的方法分解了这个
在名为 tasks.py
@app.task
def add(x,y):
return x+y
然后从pythonshell
>>> from tasks import add
>>> chain=add.s()|add.s(1)
>>> chain
magic_carpet.celery.add() | add(1)
>>> chain.args
()
>>> chain.delay(2,2)
<AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
>>> cloned_chain=chain.clone(args=(2,))
>>> cloned_chain.args
()
>>> cloned_chain.delay(2)
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
return self.apply_async(partial_args, partial_kwargs)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
dict(self.options, **options) if options else self.options))
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
first_task.apply_async(**options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
return _apply(args, kwargs, **options)
File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() missing 1 required positional argument: 'y'
>>>
显然,clone
没有替换链的克隆副本中的参数,但我不确定为什么,_chain
class 实现了克隆方法记录作为
>>> from celery.canvas import _chain
>>> help(_chain.clone)
Help on function clone in module celery.canvas:
clone(self, *args, **kwargs)
Create a copy of this signature.
Arguments:
args (Tuple): Partial args to be prepended to the existing args.
kwargs (Dict): Partial kwargs to be merged with existing kwargs.
options (Dict): Partial options to be merged with
existing options.
阅读芹菜源代码,我没有发现任何明显的原因。
当前 运行 Celery 4.2.1 和 Python 3.6.6
此功能是否以某种方式损坏、不受支持,或者我是否非常迟钝并做错了什么?
原来核心问题是通过迂回的方式,clone调用构造函数链式创建新的实例。此构造函数不接受任何 args 或 kwargs 应用于链,而是将它们默认为空值,导致它们丢失。
此时我的解决方案是通过创建我自己的克隆方法来解决此问题,该方法修改链中第一个任务的参数。虽然设置 args 属性似乎也有效,但如果在 celery 中再次克隆链,则存储在其中的值将丢失。
我的克隆方法目前支持克隆 tasks
和 chains
,尽管添加对 groups
的支持将是一个微不足道的扩展
def clone_signature(sig, args=(), kwargs=(), **opts):
if sig.subtask_type and sig.subtask_type != "chain":
raise NotImplementedError(
"Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
)
clone = sig.clone()
if hasattr(clone, "tasks"):
t = clone.tasks[0]
else:
t = clone
args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
return clone