使用任何流行的 python 网络服务器进行多处理
Multiprocessing with any popular python webserver
我使用过许多 python 网络服务器,包括标准 http.server、flask、tornado、dash、twisted 和 cherryPi。我还阅读了 django。其中的 Afaict none 与真正的多线程有任何相似之处。以 django 为例,建议使用 celery,这是一个完全独立的基于队列的任务管理器。是的,我们总是可以求助于外部队列:但这意味着没有任何 native 更接近多线程(在进程中)。我非常了解GIL,但至少会寻求共享相同的代码——类似于fork c 程序.
一个想法是尝试使用 multiprocessing 库。事实上,有一个关于该方法的问答,接受的答案 。然而,该方法似乎是纯套接字 tcp/ip:它不包括重要的 Http 处理支持。这留下了太多需要重新实现的工作(包括轮子等圆形物体)。
有什么方法可以将 multiprocessing 库方法与可用的网络服务器库合并,例如 twisted , tornado 、破折号等?否则我们如何使用它们有用的 http
处理能力?
更新 我们有多种工作负载
- small/quick 响应(亚毫秒 cpu):例如几个 RDBMS 调用
- 中等计算(两位数毫秒 cpu):例如。 encryption/decryption 个音频文件
- 大量计算(数百毫秒到个位数秒):例如音频和图像文件的信号处理
我们确实需要能够在给定机器上利用多个 cpu 来同时处理 tasks/workloads.
的混合
如果您需要多个 http 网络服务器来处理 http 请求,您可以使用 Gunicorn,它将您的应用程序的多个实例创建为子进程。
如果您有 CPU 个绑定的 OP,它们最终会阻止所有 http op,因此应该将它们分发给其他进程。因此,在启动时,您的每个 http 服务器都会创建几个执行繁重任务的子进程。
所以方案是Gunicorn->http服务器->CPU重进程
使用 aiohttp 的示例:
from aiohttp import web
import time
import multiprocessing as mp
from random import randint
def cpu_heavy_operation(num):
"""Just some CPU heavy task"""
if num not in range(1, 10):
return 0
return str(num**1000000)[0:10]
def process_worker(q: mp.Queue, name: str):
"""Target function for mp.Process. Better convert it to class"""
print(f"{name} Started worker process")
while True:
i = q.get()
if i == "STOP": # poison pill to stop child process gracefully
break
else:
print(f"{name}: {cpu_heavy_operation(i)}")
print(f"{name} Finished worker process")
async def add_another_worker_process(req: web.Request) -> web.Response:
"""Create another one child process"""
q = req.app["cpu_bound_q"]
name = randint(100000, 999999)
pr = mp.Process(
daemon=False,
target=process_worker,
args=(q, f"CPU-Bound_Pr-New-{name}",),
)
pr.start()
req.app["children_pr"] += 1
return web.json_response({"New": name, "Children": req.app["children_pr"]})
async def test_endpoint(req: web.Request) -> web.Response:
"""Just endpoint which feed child processes with tasks"""
x = req.match_info.get("num")
req.app["cpu_bound_q"].put(int(x))
return web.json_response({"num": x})
async def stop_ops(app: web.Application) -> None:
"""To do graceful shutdowns"""
for i in range(app["children_pr"]):
app["cpu_bound_q"].put("STOP")
time.sleep(30) # give child processes chance to stop gracefully
async def init_func_standalone(args=None) -> web.Application:
"""Application factory for standalone run"""
app = web.Application()
app.router.add_get(r"/test/{num:\d+}", test_endpoint)
app.router.add_get("/add", add_another_worker_process)
# create cpu_bound_ops processes block
cpu_bound_q = mp.Queue()
prcs = [
mp.Process(
daemon=False,
target=process_worker,
args=(cpu_bound_q, f"CPU-Bound_Pr-{i}",),
) for i in range(4)
]
[i.start() for i in prcs]
app["children_pr"] = 4 # you should know how many children processes you need to stop gracefully
app["cpu_bound_q"] = cpu_bound_q # Queue for cpu bound ops - multiprocessing module
app.on_cleanup.append(stop_ops)
return app
async def init_func_gunicorn() -> web.Application:
"""is used to run aiohttp with Gunicorn"""
app = await init_func_standalone()
return app
if __name__ == '__main__':
_app = init_func_standalone()
web.run_app(_app, host='0.0.0.0', port=9999)
你看我multiprocessing
,我这样做是因为我喜欢有更多的手动控制,其他选择是concurrent.futures
。 asyncio
有 run_in_executor
方法。因此,只需创建池而不是将 CPU 繁重的任务发送到 run_in_executor
,但在包装它们之前是 create_task
asyncio
方法。
我使用过许多 python 网络服务器,包括标准 http.server、flask、tornado、dash、twisted 和 cherryPi。我还阅读了 django。其中的 Afaict none 与真正的多线程有任何相似之处。以 django 为例,建议使用 celery,这是一个完全独立的基于队列的任务管理器。是的,我们总是可以求助于外部队列:但这意味着没有任何 native 更接近多线程(在进程中)。我非常了解GIL,但至少会寻求共享相同的代码——类似于fork c 程序.
一个想法是尝试使用 multiprocessing 库。事实上,有一个关于该方法的问答,接受的答案
有什么方法可以将 multiprocessing 库方法与可用的网络服务器库合并,例如 twisted , tornado 、破折号等?否则我们如何使用它们有用的 http
处理能力?
更新 我们有多种工作负载
- small/quick 响应(亚毫秒 cpu):例如几个 RDBMS 调用
- 中等计算(两位数毫秒 cpu):例如。 encryption/decryption 个音频文件
- 大量计算(数百毫秒到个位数秒):例如音频和图像文件的信号处理
我们确实需要能够在给定机器上利用多个 cpu 来同时处理 tasks/workloads.
的混合如果您需要多个 http 网络服务器来处理 http 请求,您可以使用 Gunicorn,它将您的应用程序的多个实例创建为子进程。
如果您有 CPU 个绑定的 OP,它们最终会阻止所有 http op,因此应该将它们分发给其他进程。因此,在启动时,您的每个 http 服务器都会创建几个执行繁重任务的子进程。
所以方案是Gunicorn->http服务器->CPU重进程
使用 aiohttp 的示例:
from aiohttp import web
import time
import multiprocessing as mp
from random import randint
def cpu_heavy_operation(num):
"""Just some CPU heavy task"""
if num not in range(1, 10):
return 0
return str(num**1000000)[0:10]
def process_worker(q: mp.Queue, name: str):
"""Target function for mp.Process. Better convert it to class"""
print(f"{name} Started worker process")
while True:
i = q.get()
if i == "STOP": # poison pill to stop child process gracefully
break
else:
print(f"{name}: {cpu_heavy_operation(i)}")
print(f"{name} Finished worker process")
async def add_another_worker_process(req: web.Request) -> web.Response:
"""Create another one child process"""
q = req.app["cpu_bound_q"]
name = randint(100000, 999999)
pr = mp.Process(
daemon=False,
target=process_worker,
args=(q, f"CPU-Bound_Pr-New-{name}",),
)
pr.start()
req.app["children_pr"] += 1
return web.json_response({"New": name, "Children": req.app["children_pr"]})
async def test_endpoint(req: web.Request) -> web.Response:
"""Just endpoint which feed child processes with tasks"""
x = req.match_info.get("num")
req.app["cpu_bound_q"].put(int(x))
return web.json_response({"num": x})
async def stop_ops(app: web.Application) -> None:
"""To do graceful shutdowns"""
for i in range(app["children_pr"]):
app["cpu_bound_q"].put("STOP")
time.sleep(30) # give child processes chance to stop gracefully
async def init_func_standalone(args=None) -> web.Application:
"""Application factory for standalone run"""
app = web.Application()
app.router.add_get(r"/test/{num:\d+}", test_endpoint)
app.router.add_get("/add", add_another_worker_process)
# create cpu_bound_ops processes block
cpu_bound_q = mp.Queue()
prcs = [
mp.Process(
daemon=False,
target=process_worker,
args=(cpu_bound_q, f"CPU-Bound_Pr-{i}",),
) for i in range(4)
]
[i.start() for i in prcs]
app["children_pr"] = 4 # you should know how many children processes you need to stop gracefully
app["cpu_bound_q"] = cpu_bound_q # Queue for cpu bound ops - multiprocessing module
app.on_cleanup.append(stop_ops)
return app
async def init_func_gunicorn() -> web.Application:
"""is used to run aiohttp with Gunicorn"""
app = await init_func_standalone()
return app
if __name__ == '__main__':
_app = init_func_standalone()
web.run_app(_app, host='0.0.0.0', port=9999)
你看我multiprocessing
,我这样做是因为我喜欢有更多的手动控制,其他选择是concurrent.futures
。 asyncio
有 run_in_executor
方法。因此,只需创建池而不是将 CPU 繁重的任务发送到 run_in_executor
,但在包装它们之前是 create_task
asyncio
方法。