使用任何流行的 python 网络服务器进行多处理

Multiprocessing with any popular python webserver

我使用过许多 python 网络服务器,包括标准 http.serverflasktornadodashtwistedcherryPi。我还阅读了 django。其中的 Afaict none 与真正的多线程有任何相似之处。以 django 为例,建议使用 celery,这是一个完全独立的基于队列的任务管理器。是的,我们总是可以求助于外部队列:但这意味着没有任何 native 更接近多线程(在进程中)。我非常了解GIL,但至少会寻求共享相同的代码——类似于fork c 程序.

一个想法是尝试使用 multiprocessing 库。事实上,有一个关于该方法的问答,接受的答案 。然而,该方法似乎是纯套接字 tcp/ip:它不包括重要的 Http 处理支持。这留下了太多需要重新实现的工作(包括轮子等圆形物体)。

有什么方法可以将 multiprocessing 库方法与可用的网络服务器库合并,例如 twisted , tornado 破折号等?否则我们如何使用它们有用的 http 处理能力?

更新 我们有多种工作负载

我们确实需要能够在给定机器上利用多个 cpu 来同时处理 tasks/workloads.

的混合

如果您需要多个 http 网络服务器来处理 http 请求,您可以使用 Gunicorn,它将您的应用程序的多个实例创建为子进程。

如果您有 CPU 个绑定的 OP,它们最终会阻止所有 http op,因此应该将它们分发给其他进程。因此,在启动时,您的每个 http 服务器都会创建几个执行繁重任务的子进程。

所以方案是Gunicorn->http服务器->CPU重进程

使用 aiohttp 的示例:

from aiohttp import web
import time
import multiprocessing as mp
from random import randint


def cpu_heavy_operation(num):
    """Just some CPU heavy task"""
    if num not in range(1, 10):
        return 0
    return str(num**1000000)[0:10]


def process_worker(q: mp.Queue, name: str):
    """Target function for mp.Process. Better convert it to class"""
    print(f"{name} Started worker process")
    while True:
        i = q.get()
        if i == "STOP":  # poison pill to stop child process gracefully
            break
        else:
            print(f"{name}: {cpu_heavy_operation(i)}")
    print(f"{name} Finished worker process")


async def add_another_worker_process(req: web.Request) -> web.Response:
    """Create another one child process"""
    q = req.app["cpu_bound_q"]
    name = randint(100000, 999999)
    pr = mp.Process(
        daemon=False,
        target=process_worker,
        args=(q, f"CPU-Bound_Pr-New-{name}",),
    )
    pr.start()
    req.app["children_pr"] += 1
    return web.json_response({"New": name, "Children": req.app["children_pr"]})


async def test_endpoint(req: web.Request) -> web.Response:
    """Just endpoint which feed child processes with tasks"""
    x = req.match_info.get("num")
    req.app["cpu_bound_q"].put(int(x))
    return web.json_response({"num": x})


async def stop_ops(app: web.Application) -> None:
    """To do graceful shutdowns"""
    for i in range(app["children_pr"]):
        app["cpu_bound_q"].put("STOP")

    time.sleep(30)  # give child processes chance to stop gracefully


async def init_func_standalone(args=None) -> web.Application:
    """Application factory for standalone run"""
    app = web.Application()
    app.router.add_get(r"/test/{num:\d+}", test_endpoint)
    app.router.add_get("/add", add_another_worker_process)

    # create cpu_bound_ops processes block
    cpu_bound_q = mp.Queue()
    prcs = [
        mp.Process(
            daemon=False,
            target=process_worker,
            args=(cpu_bound_q, f"CPU-Bound_Pr-{i}",),
        ) for i in range(4)
    ]
    [i.start() for i in prcs]
    app["children_pr"] = 4  # you should know how many children processes you need to stop gracefully
    app["cpu_bound_q"] = cpu_bound_q  # Queue for cpu bound ops - multiprocessing module

    app.on_cleanup.append(stop_ops)

    return app


async def init_func_gunicorn() -> web.Application:
    """is used to run aiohttp with Gunicorn"""
    app = await init_func_standalone()
    return app

if __name__ == '__main__':
    _app = init_func_standalone()
    web.run_app(_app, host='0.0.0.0', port=9999)

你看我multiprocessing,我这样做是因为我喜欢有更多的手动控制,其他选择是concurrent.futuresasynciorun_in_executor 方法。因此,只需创建池而不是将 CPU 繁重的任务发送到 run_in_executor,但在包装它们之前是 create_task asyncio 方法。