如何并行化芹菜中的子任务

How to parallelize subtasks in celery

我有以下代码。这工作正常。但是我在 get_host_types() 中的 for 循环中迭代指标。我想从 get_host_types() 函数为每个将调用 celery 任务 get_host_type() 的指标创建子任务。这将允许子任务在工作节点上独立 运行。我想等待方法 get_host_types() 中的结果和 return 中的结果。我正在考虑使用 group()。但是我无法在 AsyncResult() 上调用 .get()。如果我不并行化,我就没有利用分布式任务框架来加速主任务。

from __future__ import print_function


from celery import Celery, group
import requests

app = Celery('celery_test')

app.config_from_object('config')

@app.task
def get_host_type(metric, alert):
    host_types = get_host_types_for_alert(alert['alert_id'], metric)
    return host_types

class MyObject(dict):
    def __init__(self, alert, host_types):
        dict.__init__(self, alert=alert, host_types=host_types)


@app.task(serializer='json')
def get_host_types(my_obj):
    print(f"alert get_host_types ============> {my_obj}")
    alert = my_obj['alert']['alert']
    metrics = my_obj['host_types']
    ret_val = set()
    for m in metrics:
        res = alert_id_host_type_mapper.get_host_types_for_alert(alert['id'], m)
        ret_val.update(res)
    print(f"Return value ======> {ret_val}")
    return list(ret_val)


@app.task
def get_metrics(alert):
    print(f" alert ==> {alert}")
    #alert = alert[0]
    metric = alerts_client.get_metrics(alert['alert'])
    metrics = alert_id_host_type_mapper.metric_parser(metric)
    return MyObject(alert, metrics)


@app.task
def get_alert(alert_id):
    print(f" =====> alert id {alert_id}")
    return alerts_client.get_alerts(alert_id)


if __name__ == "__main__":
    res = (get_alert.s(267483) | get_metrics.s() | get_host_types.s()).apply_async()
    print(res.get())

编辑: 如果我在子任务中执行 result.get(),我会收到以下错误。

[2022-02-10 19:36:03,904: WARNING/ForkPoolWorker-42] Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/sshil/code/statsboard/statsboard/celery_test/celery_test.py", line 30, in run
    result_int = res.get()
  File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 680, in get
    on_interval=on_interval,
  File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 793, in join_native
    assert_will_not_block()
  File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 37, in assert_will_not_block
    raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

你可以像下面这样使用 Celery Group

@app.task(serializer='json')
def get_host_types(my_obj):
    print(f"alert get_host_types ============> {my_obj}")
    alert = my_obj['alert']['alert']
    metrics = my_obj['host_types']
    ret_val = []
    tasks = []
    for m in metrics:
        tasks.append(get_host_type.s(m,alert['id']))
    # create a group with all the tasks
    job = group(tasks)
    result = job.apply_async()
    ret_val = result.get(disable_sync_subtasks=False)
    return ret_val

有关 Celery Group 的更多信息,请参阅 -> http://ask.github.io/celery/userguide/groups.html#groups