如何配置芹菜与多进程并发执行?

How to configure celery for concurrent execution with multi-process?

我有一个与外部 API 对话的任务,json 响应非常大,我必须多次进行此调用,然后进行进一步的 python 处理。为了减少耗时,我最初尝试了:

def make_call(*args, **kwargs):
    pass

def make_another(*args, **kwargs):
    pass

def get_calls():
    return make_call, make_another

def task(*args, **kwargs):
    procs = [Process(target=get_calls()[i], args=(,), 
    kwargs={}) for i in range(3)]
    _start = [proc.start() for proc in procs]
    _join = [proc.join() for proc in procs]

# 
transaction.on_commit(lambda: task.delay()) 

但是,我运行变成了一个AssertionError:守护进程,不允许有子进程。使用附加进程加速 celery 任务的最佳方法是什么?

Celery worker 已经创建了许多进程。利用许多工作进程而不是创建子进程。您可以将工作委托给芹菜工人。这将导致更多 stable/reliable 执行。

您可以只从客户端代码创建许多任务,也可以使用 celery 的原语,如 chains or chords to parallelize the work. These can also be composed with other primitives like groups

例如,在您的场景中,您可能有两项任务:一项是进行 API 调用 make_api_call,另一项是解析响应 parse_response。您可以 将它们 链接在一起。

# chain another task when a task completes successfully
res = make_api_call.apply_async((0,), link=parse_response.s())

# chain syntax 1
result_1 = chain(make_api_call.s(1), parse_response.s())
# syntax 2 with | operator
result_b = make_api_call.s(2) | parse_response.s()

# can group chains
job = group([
   chain(make_api_call.s(i), parse_response.s()) 
   for i in range(3)
   ]
)
result = job.apply_async()

这只是一个通用示例。您可以创建任务并将它们组合成您的工作流需要。请参阅:Canvas: Designing Work-flows 了解更多信息。