如何实现多处理 python 装饰器?
How to implement a multiprocessing python decorator?
我想编写一个包装器来调用 asyncio 中要求 CPU 的函数。
我希望它像这样使用:
@cpu_bound
def fact(x: int):
res: int = 1
while x != 1:
res *= x
x -= 1
return res
async def foo(x: int):
res = await fact(x)
...
一开始我是这样写的:
def cpu_bound(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
executor = get_executor() # This is a part where I implemented myself.
return await loop.run_in_executor(
executor, functools.partial(func, *args, **kwargs)
)
return wrapper
但是,我遇到了酸洗问题。
Traceback (most recent call last): File
"C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\multiprocessing\queues.py",
line 245, in _feed
obj = _ForkingPickler.dumps(obj) File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\multiprocessing\reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function fact at 0x000001C2D7D40820>: it's not the same object as main.fact
也许原始函数和包装函数不相同 id
是问题所在?
那么,有没有办法编写这样的包装器?
我知道我可以使用 loop.run_in_executor
,但拥有这样的包装器会有很大帮助。
Maybe the original function and the wrapped one not having the same id is the problem?
在某种程度上,是的。在函数被发送到目标进程之前,它被腌制,在你的情况下失败,因为装饰器范围内的 func
对象与主模块中的 fact
对象不同,在装饰器重新绑定之后.
查看 this and this 问题了解一些背景知识。
基于这些答案,我创建了一个示例来说明如何实现您想要的。
诀窍是创建一个可挑选的“运行ner”函数,目标进程可以使用它从某种注册表(例如字典...)中查找您的原始函数并 运行 它。这当然只是一个例子。您可能不想在装饰器中创建 ProcessPoolExecutor
。
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
original_functions={}
def func_runner(name, *args):
return original_functions[name](*args)
def cpu_bound(func):
original_functions[func.__name__]=func
@functools.wraps(func)
async def wrapper(*args):
with ProcessPoolExecutor(1) as pool:
res = await asyncio.get_running_loop().run_in_executor(
pool, functools.partial(func_runner, func.__name__, *args)
)
return res
return wrapper
@cpu_bound
def fact(arg):
return arg
async def foo():
res = await fact("ok")
print(res)
if __name__ == "__main__":
asyncio.run(foo())
我想编写一个包装器来调用 asyncio 中要求 CPU 的函数。
我希望它像这样使用:
@cpu_bound
def fact(x: int):
res: int = 1
while x != 1:
res *= x
x -= 1
return res
async def foo(x: int):
res = await fact(x)
...
一开始我是这样写的:
def cpu_bound(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
executor = get_executor() # This is a part where I implemented myself.
return await loop.run_in_executor(
executor, functools.partial(func, *args, **kwargs)
)
return wrapper
但是,我遇到了酸洗问题。
Traceback (most recent call last): File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\multiprocessing\queues.py", line 245, in _feed obj = _ForkingPickler.dumps(obj) File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\multiprocessing\reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) _pickle.PicklingError: Can't pickle <function fact at 0x000001C2D7D40820>: it's not the same object as main.fact
也许原始函数和包装函数不相同 id
是问题所在?
那么,有没有办法编写这样的包装器?
我知道我可以使用 loop.run_in_executor
,但拥有这样的包装器会有很大帮助。
Maybe the original function and the wrapped one not having the same id is the problem?
在某种程度上,是的。在函数被发送到目标进程之前,它被腌制,在你的情况下失败,因为装饰器范围内的 func
对象与主模块中的 fact
对象不同,在装饰器重新绑定之后.
查看 this and this 问题了解一些背景知识。
基于这些答案,我创建了一个示例来说明如何实现您想要的。
诀窍是创建一个可挑选的“运行ner”函数,目标进程可以使用它从某种注册表(例如字典...)中查找您的原始函数并 运行 它。这当然只是一个例子。您可能不想在装饰器中创建 ProcessPoolExecutor
。
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
original_functions={}
def func_runner(name, *args):
return original_functions[name](*args)
def cpu_bound(func):
original_functions[func.__name__]=func
@functools.wraps(func)
async def wrapper(*args):
with ProcessPoolExecutor(1) as pool:
res = await asyncio.get_running_loop().run_in_executor(
pool, functools.partial(func_runner, func.__name__, *args)
)
return res
return wrapper
@cpu_bound
def fact(arg):
return arg
async def foo():
res = await fact("ok")
print(res)
if __name__ == "__main__":
asyncio.run(foo())