使用 Celery 在部分任务中使用位置参数的链组
Groups of chains with positional arguments in partial tasks using Celery
我正在编写一个应用程序,它将异步执行一组多个同步任务链。
换句话说,我可能有一些 bs
列表的管道 foo(a,b,c) -> boo(a,b,c)
。
我的理解是为这个列表中的每个 b 创建一个 foo(a,b,c) | boo(a,b,c)
链。然后这些链将形成一个芹菜组,可以异步应用。
我的代码如下:
my_app.py
#!/usr/bin/env python3
import functools
import time
from celery import chain, group, Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def foo(a, b, c):
logger.info("foo from {0}!".format(b))
return b
@app.task
def boo(a, b, c):
logger.info("boo from {0}!".format(b))
return b
def break_up_tasks(tasks):
try:
first_task, *remaining_tasks = tasks
except ValueError as e:
first_task, remaining_tasks = [], []
return first_task, remaining_tasks
def do_tasks(a, bs, c, opts):
tasks = [foo, boo]
# There should be an option for each task
if len(opts) != len(tasks):
raise ValueError("There should be {0} provided options".format(len(tasks)))
# Create a list of tasks that should be included per the list of options' boolean values
tasks = [task for opt, task in zip(opts, tasks) if opt]
first_task, remaining_tasks = break_up_tasks(tasks)
# If there are no tasks, we're done.
if not first_task: return
chains = (
functools.reduce(
# `a` should be provided by `apply_async`'s `args` kwarg
# `b` should be provided by previous partials in chain
lambda x, y: x | y.s(c),
remaining_tasks, first_task.s(a, b, c)
) for b in bs
)
g = group(*chains)
res = g.apply_async(args=(a,), queue="default")
print("Applied async... waiting for termination.")
total_tasks = len(tasks)
while not res.ready():
print("Waiting... {0}/{1} tasks complete".format(res.completed_count(), total_tasks))
time.sleep(1)
if __name__ == "__main__":
a = "whatever"
bs = ["hello", "world"]
c = "baz"
opts = [
# do "foo"
True,
# do "boo"
True
]
do_tasks(a, bs, c, opts)
Running celery
celery worker -A my_app -l info -c 5 -Q default
不过,我发现的是,当我 运行 以上内容时,我的服务器客户端 运行 是一个无限循环,因为 boo
缺少一个参数:
TypeError: boo() missing 1 required positional argument: 'c'
我的理解是 apply_async
将为每个链提供 args
kwarg,链中的先前链接将为后续链接提供其 return 值。
为什么 boo
没有正确接收参数?我敢肯定这些任务写得不好,因为这是我第一次涉足 Celery。如果您有其他建议,我很乐意接受。
在调试你的代码之后(我也是 Celery 的新手!:))我了解到每个链式函数都会将第一个参数替换为前一个链式函数调用的结果 - 所以我说相信您的问题的解决方案是将一个缺少的参数(第二个)添加到 reduce:
中的 y.s
chains = (
functools.reduce(
# `a` should be provided by `apply_async`'s `args` kwarg
# `b` should be provided by previous partials in chain
lambda x, y: x | y.s(b,c), # <- here is the 'new guy'
remaining_tasks, first_task.s(a, b, c)
) for b in bs
)
希望对您有所帮助。
我正在编写一个应用程序,它将异步执行一组多个同步任务链。
换句话说,我可能有一些 bs
列表的管道 foo(a,b,c) -> boo(a,b,c)
。
我的理解是为这个列表中的每个 b 创建一个 foo(a,b,c) | boo(a,b,c)
链。然后这些链将形成一个芹菜组,可以异步应用。
我的代码如下:
my_app.py
#!/usr/bin/env python3
import functools
import time
from celery import chain, group, Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def foo(a, b, c):
logger.info("foo from {0}!".format(b))
return b
@app.task
def boo(a, b, c):
logger.info("boo from {0}!".format(b))
return b
def break_up_tasks(tasks):
try:
first_task, *remaining_tasks = tasks
except ValueError as e:
first_task, remaining_tasks = [], []
return first_task, remaining_tasks
def do_tasks(a, bs, c, opts):
tasks = [foo, boo]
# There should be an option for each task
if len(opts) != len(tasks):
raise ValueError("There should be {0} provided options".format(len(tasks)))
# Create a list of tasks that should be included per the list of options' boolean values
tasks = [task for opt, task in zip(opts, tasks) if opt]
first_task, remaining_tasks = break_up_tasks(tasks)
# If there are no tasks, we're done.
if not first_task: return
chains = (
functools.reduce(
# `a` should be provided by `apply_async`'s `args` kwarg
# `b` should be provided by previous partials in chain
lambda x, y: x | y.s(c),
remaining_tasks, first_task.s(a, b, c)
) for b in bs
)
g = group(*chains)
res = g.apply_async(args=(a,), queue="default")
print("Applied async... waiting for termination.")
total_tasks = len(tasks)
while not res.ready():
print("Waiting... {0}/{1} tasks complete".format(res.completed_count(), total_tasks))
time.sleep(1)
if __name__ == "__main__":
a = "whatever"
bs = ["hello", "world"]
c = "baz"
opts = [
# do "foo"
True,
# do "boo"
True
]
do_tasks(a, bs, c, opts)
Running celery
celery worker -A my_app -l info -c 5 -Q default
不过,我发现的是,当我 运行 以上内容时,我的服务器客户端 运行 是一个无限循环,因为 boo
缺少一个参数:
TypeError: boo() missing 1 required positional argument: 'c'
我的理解是 apply_async
将为每个链提供 args
kwarg,链中的先前链接将为后续链接提供其 return 值。
为什么 boo
没有正确接收参数?我敢肯定这些任务写得不好,因为这是我第一次涉足 Celery。如果您有其他建议,我很乐意接受。
在调试你的代码之后(我也是 Celery 的新手!:))我了解到每个链式函数都会将第一个参数替换为前一个链式函数调用的结果 - 所以我说相信您的问题的解决方案是将一个缺少的参数(第二个)添加到 reduce:
中的 y.schains = (
functools.reduce(
# `a` should be provided by `apply_async`'s `args` kwarg
# `b` should be provided by previous partials in chain
lambda x, y: x | y.s(b,c), # <- here is the 'new guy'
remaining_tasks, first_task.s(a, b, c)
) for b in bs
)
希望对您有所帮助。