如何配置芹菜与多进程并发执行?
How to configure celery for concurrent execution with multi-process?
我有一个与外部 API 对话的任务,json
响应非常大,我必须多次进行此调用,然后进行进一步的 python 处理。为了减少耗时,我最初尝试了:
def make_call(*args, **kwargs):
pass
def make_another(*args, **kwargs):
pass
def get_calls():
return make_call, make_another
def task(*args, **kwargs):
procs = [Process(target=get_calls()[i], args=(,),
kwargs={}) for i in range(3)]
_start = [proc.start() for proc in procs]
_join = [proc.join() for proc in procs]
#
transaction.on_commit(lambda: task.delay())
但是,我运行变成了一个AssertionError:
守护进程,不允许有子进程。使用附加进程加速 celery 任务的最佳方法是什么?
Celery worker 已经创建了许多进程。利用许多工作进程而不是创建子进程。您可以将工作委托给芹菜工人。这将导致更多 stable/reliable 执行。
您可以只从客户端代码创建许多任务,也可以使用 celery 的原语,如 chains or chords to parallelize the work. These can also be composed with other primitives like groups 等
例如,在您的场景中,您可能有两项任务:一项是进行 API 调用 make_api_call
,另一项是解析响应 parse_response
。您可以 将它们 链接在一起。
# chain another task when a task completes successfully
res = make_api_call.apply_async((0,), link=parse_response.s())
# chain syntax 1
result_1 = chain(make_api_call.s(1), parse_response.s())
# syntax 2 with | operator
result_b = make_api_call.s(2) | parse_response.s()
# can group chains
job = group([
chain(make_api_call.s(i), parse_response.s())
for i in range(3)
]
)
result = job.apply_async()
这只是一个通用示例。您可以创建任务并将它们组合成您的工作流需要。请参阅:Canvas: Designing Work-flows 了解更多信息。
我有一个与外部 API 对话的任务,json
响应非常大,我必须多次进行此调用,然后进行进一步的 python 处理。为了减少耗时,我最初尝试了:
def make_call(*args, **kwargs):
pass
def make_another(*args, **kwargs):
pass
def get_calls():
return make_call, make_another
def task(*args, **kwargs):
procs = [Process(target=get_calls()[i], args=(,),
kwargs={}) for i in range(3)]
_start = [proc.start() for proc in procs]
_join = [proc.join() for proc in procs]
#
transaction.on_commit(lambda: task.delay())
但是,我运行变成了一个AssertionError:
守护进程,不允许有子进程。使用附加进程加速 celery 任务的最佳方法是什么?
Celery worker 已经创建了许多进程。利用许多工作进程而不是创建子进程。您可以将工作委托给芹菜工人。这将导致更多 stable/reliable 执行。
您可以只从客户端代码创建许多任务,也可以使用 celery 的原语,如 chains or chords to parallelize the work. These can also be composed with other primitives like groups 等
例如,在您的场景中,您可能有两项任务:一项是进行 API 调用 make_api_call
,另一项是解析响应 parse_response
。您可以 将它们 链接在一起。
# chain another task when a task completes successfully
res = make_api_call.apply_async((0,), link=parse_response.s())
# chain syntax 1
result_1 = chain(make_api_call.s(1), parse_response.s())
# syntax 2 with | operator
result_b = make_api_call.s(2) | parse_response.s()
# can group chains
job = group([
chain(make_api_call.s(i), parse_response.s())
for i in range(3)
]
)
result = job.apply_async()
这只是一个通用示例。您可以创建任务并将它们组合成您的工作流需要。请参阅:Canvas: Designing Work-flows 了解更多信息。