如何并行化芹菜中的子任务
How to parallelize subtasks in celery
我有以下代码。这工作正常。但是我在 get_host_types()
中的 for 循环中迭代指标。我想从 get_host_types()
函数为每个将调用 celery 任务 get_host_type()
的指标创建子任务。这将允许子任务在工作节点上独立 运行。我想等待方法 get_host_types()
中的结果和 return 中的结果。我正在考虑使用 group()
。但是我无法在 AsyncResult()
上调用 .get()
。如果我不并行化,我就没有利用分布式任务框架来加速主任务。
from __future__ import print_function
from celery import Celery, group
import requests
app = Celery('celery_test')
app.config_from_object('config')
@app.task
def get_host_type(metric, alert):
host_types = get_host_types_for_alert(alert['alert_id'], metric)
return host_types
class MyObject(dict):
def __init__(self, alert, host_types):
dict.__init__(self, alert=alert, host_types=host_types)
@app.task(serializer='json')
def get_host_types(my_obj):
print(f"alert get_host_types ============> {my_obj}")
alert = my_obj['alert']['alert']
metrics = my_obj['host_types']
ret_val = set()
for m in metrics:
res = alert_id_host_type_mapper.get_host_types_for_alert(alert['id'], m)
ret_val.update(res)
print(f"Return value ======> {ret_val}")
return list(ret_val)
@app.task
def get_metrics(alert):
print(f" alert ==> {alert}")
#alert = alert[0]
metric = alerts_client.get_metrics(alert['alert'])
metrics = alert_id_host_type_mapper.metric_parser(metric)
return MyObject(alert, metrics)
@app.task
def get_alert(alert_id):
print(f" =====> alert id {alert_id}")
return alerts_client.get_alerts(alert_id)
if __name__ == "__main__":
res = (get_alert.s(267483) | get_metrics.s() | get_host_types.s()).apply_async()
print(res.get())
编辑:
如果我在子任务中执行 result.get()
,我会收到以下错误。
[2022-02-10 19:36:03,904: WARNING/ForkPoolWorker-42] Exception in thread Thread-7:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/sshil/code/statsboard/statsboard/celery_test/celery_test.py", line 30, in run
result_int = res.get()
File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 680, in get
on_interval=on_interval,
File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 793, in join_native
assert_will_not_block()
File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 37, in assert_will_not_block
raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks
你可以像下面这样使用 Celery Group
@app.task(serializer='json')
def get_host_types(my_obj):
print(f"alert get_host_types ============> {my_obj}")
alert = my_obj['alert']['alert']
metrics = my_obj['host_types']
ret_val = []
tasks = []
for m in metrics:
tasks.append(get_host_type.s(m,alert['id']))
# create a group with all the tasks
job = group(tasks)
result = job.apply_async()
ret_val = result.get(disable_sync_subtasks=False)
return ret_val
有关 Celery Group 的更多信息,请参阅 ->
http://ask.github.io/celery/userguide/groups.html#groups
我有以下代码。这工作正常。但是我在 get_host_types()
中的 for 循环中迭代指标。我想从 get_host_types()
函数为每个将调用 celery 任务 get_host_type()
的指标创建子任务。这将允许子任务在工作节点上独立 运行。我想等待方法 get_host_types()
中的结果和 return 中的结果。我正在考虑使用 group()
。但是我无法在 AsyncResult()
上调用 .get()
。如果我不并行化,我就没有利用分布式任务框架来加速主任务。
from __future__ import print_function
from celery import Celery, group
import requests
app = Celery('celery_test')
app.config_from_object('config')
@app.task
def get_host_type(metric, alert):
host_types = get_host_types_for_alert(alert['alert_id'], metric)
return host_types
class MyObject(dict):
def __init__(self, alert, host_types):
dict.__init__(self, alert=alert, host_types=host_types)
@app.task(serializer='json')
def get_host_types(my_obj):
print(f"alert get_host_types ============> {my_obj}")
alert = my_obj['alert']['alert']
metrics = my_obj['host_types']
ret_val = set()
for m in metrics:
res = alert_id_host_type_mapper.get_host_types_for_alert(alert['id'], m)
ret_val.update(res)
print(f"Return value ======> {ret_val}")
return list(ret_val)
@app.task
def get_metrics(alert):
print(f" alert ==> {alert}")
#alert = alert[0]
metric = alerts_client.get_metrics(alert['alert'])
metrics = alert_id_host_type_mapper.metric_parser(metric)
return MyObject(alert, metrics)
@app.task
def get_alert(alert_id):
print(f" =====> alert id {alert_id}")
return alerts_client.get_alerts(alert_id)
if __name__ == "__main__":
res = (get_alert.s(267483) | get_metrics.s() | get_host_types.s()).apply_async()
print(res.get())
编辑:
如果我在子任务中执行 result.get()
,我会收到以下错误。
[2022-02-10 19:36:03,904: WARNING/ForkPoolWorker-42] Exception in thread Thread-7:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/sshil/code/statsboard/statsboard/celery_test/celery_test.py", line 30, in run
result_int = res.get()
File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 680, in get
on_interval=on_interval,
File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 793, in join_native
assert_will_not_block()
File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 37, in assert_will_not_block
raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks
你可以像下面这样使用 Celery Group
@app.task(serializer='json')
def get_host_types(my_obj):
print(f"alert get_host_types ============> {my_obj}")
alert = my_obj['alert']['alert']
metrics = my_obj['host_types']
ret_val = []
tasks = []
for m in metrics:
tasks.append(get_host_type.s(m,alert['id']))
# create a group with all the tasks
job = group(tasks)
result = job.apply_async()
ret_val = result.get(disable_sync_subtasks=False)
return ret_val
有关 Celery Group 的更多信息,请参阅 -> http://ask.github.io/celery/userguide/groups.html#groups