运行 如何并行执行多个 celery 任务(通过使用组)?
How can run multiple celery tasks in parallel (by using group)?
我是 Celery 的新手。我想 运行 demo_task 并行,但它 运行s 任务顺序而不是并行。如果我做错了什么,请告诉我。
import time
from celery import Celery
from celery import chain, group, chord, chunks
import pandas as pd
CONFIG = {
'BROKER_URL': 'redis://localhost:6379/0',
'CELERY_RESULT_BACKEND': 'redis://localhost:6379/0',
}
app = Celery()
app.config_from_object(CONFIG)
@app.task(name='demo_task')
def demo_task(x, y):
print("demo_task", x, y)
pd.DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]}).to_csv(f"demo{x}.csv", index=False)
print("saved")
time.sleep(8)
def run_task():
print("start chain_call")
t = group(*[demo_task.signature((3, 3)),
demo_task.signature((4, 4)),
demo_task.signature((5, 5))]
).apply_async()
if __name__ == '__main__':
run_task()
[命令]
celery -A celery_demo worker -l info --pool=solo --purge
[日志]
[2022-04-22 16:29:51,668: WARNING/MainProcess] Please run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.
[2022-04-22 16:29:51,668: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-04-22 16:29:51,668: INFO/MainProcess] mingle: searching for neighbors
[2022-04-22 16:29:52,672: INFO/MainProcess] mingle: all alone
[2022-04-22 16:30:05,602: WARNING/MainProcess]
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4
[2022-04-22 16:30:05,602: WARNING/MainProcess]
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4
[2022-04-22 16:30:05,602: WARNING/MainProcess] saved
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[c017c03e-b49d-4d54-85c5-4af57dd55908] succeeded in 8.016000000061467s: None
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] received
[2022-04-22 16:30:13,614: WARNING/MainProcess] demo_task
[2022-04-22 16:30:13,614: WARNING/MainProcess]
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5
[2022-04-22 16:30:13,614: WARNING/MainProcess]
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5
[2022-04-22 16:30:13,614: WARNING/MainProcess] saved
[2022-04-22 16:30:21,634: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] succeeded in 8.015000000130385s: None
修改你的run_task函数
async def run_task():
print("start chain_call")
t = await group(*[demo_task.signature((3, 3)),
demo_task.signature((4, 4)),
demo_task.signature((5, 5))]
).apply_async()
如果您使用“单人”池,您希望任务如何并行 运行?
相反,从 prefork 并发(默认)开始:celery -A celery_demo worker -l info -c 8
这将使 Celery worker 产生 8 个可以并行执行任务的工作进程。如果您的机器有 8 个以上的内核,那么您可以将该数字从 8 增加到 N,其中 N 是主机上可用的内核数。我总是去 N-1 让系统多一个备用核心来做其他事情。
Prefork 并发非常适合 CPU-bound 任务。如果您的任务更多关于 I/O,那么请尝试使用“gevent”或“eventlet”并发类型。
我是 Celery 的新手。我想 运行 demo_task 并行,但它 运行s 任务顺序而不是并行。如果我做错了什么,请告诉我。
import time
from celery import Celery
from celery import chain, group, chord, chunks
import pandas as pd
CONFIG = {
'BROKER_URL': 'redis://localhost:6379/0',
'CELERY_RESULT_BACKEND': 'redis://localhost:6379/0',
}
app = Celery()
app.config_from_object(CONFIG)
@app.task(name='demo_task')
def demo_task(x, y):
print("demo_task", x, y)
pd.DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]}).to_csv(f"demo{x}.csv", index=False)
print("saved")
time.sleep(8)
def run_task():
print("start chain_call")
t = group(*[demo_task.signature((3, 3)),
demo_task.signature((4, 4)),
demo_task.signature((5, 5))]
).apply_async()
if __name__ == '__main__':
run_task()
[命令]
celery -A celery_demo worker -l info --pool=solo --purge
[日志]
[2022-04-22 16:29:51,668: WARNING/MainProcess] Please run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.
[2022-04-22 16:29:51,668: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-04-22 16:29:51,668: INFO/MainProcess] mingle: searching for neighbors
[2022-04-22 16:29:52,672: INFO/MainProcess] mingle: all alone
[2022-04-22 16:30:05,602: WARNING/MainProcess]
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4
[2022-04-22 16:30:05,602: WARNING/MainProcess]
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4
[2022-04-22 16:30:05,602: WARNING/MainProcess] saved
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[c017c03e-b49d-4d54-85c5-4af57dd55908] succeeded in 8.016000000061467s: None
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] received
[2022-04-22 16:30:13,614: WARNING/MainProcess] demo_task
[2022-04-22 16:30:13,614: WARNING/MainProcess]
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5
[2022-04-22 16:30:13,614: WARNING/MainProcess]
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5
[2022-04-22 16:30:13,614: WARNING/MainProcess] saved
[2022-04-22 16:30:21,634: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] succeeded in 8.015000000130385s: None
修改你的run_task函数
async def run_task():
print("start chain_call")
t = await group(*[demo_task.signature((3, 3)),
demo_task.signature((4, 4)),
demo_task.signature((5, 5))]
).apply_async()
如果您使用“单人”池,您希望任务如何并行 运行?
相反,从 prefork 并发(默认)开始:celery -A celery_demo worker -l info -c 8
这将使 Celery worker 产生 8 个可以并行执行任务的工作进程。如果您的机器有 8 个以上的内核,那么您可以将该数字从 8 增加到 N,其中 N 是主机上可用的内核数。我总是去 N-1 让系统多一个备用核心来做其他事情。
Prefork 并发非常适合 CPU-bound 任务。如果您的任务更多关于 I/O,那么请尝试使用“gevent”或“eventlet”并发类型。