芹菜:如何'|'操作员在链接多个任务时工作?
Celery: How '|' operator works while chaining multi tasks?
我知道 |
是按位 'Or' 运算符,但它让我想知道在链接多个任务时它在 celery 的情况下如何工作。
(first_task.s(url) | second_tasks.s()).apply_async()
我知道第二个任务会将第一个函数的结果作为参数,但这怎么可能呢? “|”在哪里在 dj-celery 源代码中重载?
@task
def second_task(results):
do_something(results)
有人可以提供一些见解吗?
他们可能使用运算符重载 __or__(self, other)
: http://www.rafekettler.com/magicmethods.html
我不知道 Celery 的实现细节,只是给你一个想法:
class Task(object):
def __init__(self, name):
self.name = name
self.chain = [self]
def __or__(self, other):
self.chain.append(other)
return self
def __repr__(self):
return self.name
def apply_async(self):
for c in self.chain:
print "applying", c
(Task('A') | Task('B') | Task('C')).apply_async()
输出:
applying A
applying B
applying C
上面说了Celery重写了__or__
操作符,具体如下:
def __or__(self, other):
if isinstance(other, group):
other = maybe_unroll_group(other)
if not isinstance(self, chain) and isinstance(other, chain):
return chain((self, ) + other.tasks, app=self._app)
elif isinstance(other, chain):
return chain(*self.tasks + other.tasks, app=self._app)
elif isinstance(other, Signature):
if isinstance(self, chain):
return chain(*self.tasks + (other, ), app=self._app)
return chain(self, other, app=self._app)
return NotImplemented
完整的实现在这里:https://github.com/celery/celery/blob/master/celery/canvas.py#L324
我知道 |
是按位 'Or' 运算符,但它让我想知道在链接多个任务时它在 celery 的情况下如何工作。
(first_task.s(url) | second_tasks.s()).apply_async()
我知道第二个任务会将第一个函数的结果作为参数,但这怎么可能呢? “|”在哪里在 dj-celery 源代码中重载?
@task
def second_task(results):
do_something(results)
有人可以提供一些见解吗?
他们可能使用运算符重载 __or__(self, other)
: http://www.rafekettler.com/magicmethods.html
我不知道 Celery 的实现细节,只是给你一个想法:
class Task(object):
def __init__(self, name):
self.name = name
self.chain = [self]
def __or__(self, other):
self.chain.append(other)
return self
def __repr__(self):
return self.name
def apply_async(self):
for c in self.chain:
print "applying", c
(Task('A') | Task('B') | Task('C')).apply_async()
输出:
applying A
applying B
applying C
上面说了Celery重写了__or__
操作符,具体如下:
def __or__(self, other):
if isinstance(other, group):
other = maybe_unroll_group(other)
if not isinstance(self, chain) and isinstance(other, chain):
return chain((self, ) + other.tasks, app=self._app)
elif isinstance(other, chain):
return chain(*self.tasks + other.tasks, app=self._app)
elif isinstance(other, Signature):
if isinstance(self, chain):
return chain(*self.tasks + (other, ), app=self._app)
return chain(self, other, app=self._app)
return NotImplemented
完整的实现在这里:https://github.com/celery/celery/blob/master/celery/canvas.py#L324