为多处理酸洗双重装饰功能
Pickling doubly-decorated functions for Multiprocessing
我正在尝试编写一些样板代码来简化常见的工作流程:
给定一些接受参数的函数 f
和这些参数的列表,return 一个创建队列和进程、传递工作并最终将队列中的结果收集到列表中的新函数
我意识到这与 map
所做的非常相似,但这应该更灵活(尽管与 MWE 无关。
问题是 pickle 修饰函数不起作用,即使在我使用 functools.wraps
之后也是如此。
当我通过传入函数的“排队”版本而不是在 process_wrapper
中创建它进行测试时,代码 运行 没有问题。因此,我认为问题是函数 f
被“双重装饰”,但我不知道如何解决这个问题。
import multiprocessing as mp
from multiprocessing.queues import Queue
from typing import Callable
from functools import wraps
def func_wrapper(f: Callable) -> Callable:
@wraps(f)
def q_f(q_in, q_out):
while not q_in.empty():
args = q_in.get()
out = f(*args)
q_out.put(out)
return
return q_f
def process_wrapper(f: Callable, num_process: int = 10) -> Callable:
f = func_wrapper(f)
@wraps(f)
def ff(ls) -> list:
manager = mp.Manager()
q_in = manager.Queue()
q_out = manager.Queue()
[q_in.put(x) for x in ls]
# queue_func = f
processes = []
for _ in range(num_process):
p = mp.Process(target = f, args = (q_in, q_out))
processes.append(p)
p.start()
for p in processes:
p.join()
output = []
while not q_out.empty():
output.append(q_out.get())
return output
return ff
if __name__ == "__main__":
def f(x):
print(x)
f = process_wrapper(f)
list_args = [(x,) for x in range(100)]
f(list_args)
我最终使用了 multiprocess
而不是 multiprocessing
,它解决了问题。它与序列化 pickle
与 dill
的限制有关
我正在尝试编写一些样板代码来简化常见的工作流程:
给定一些接受参数的函数 f
和这些参数的列表,return 一个创建队列和进程、传递工作并最终将队列中的结果收集到列表中的新函数
我意识到这与 map
所做的非常相似,但这应该更灵活(尽管与 MWE 无关。
问题是 pickle 修饰函数不起作用,即使在我使用 functools.wraps
之后也是如此。
当我通过传入函数的“排队”版本而不是在 process_wrapper
中创建它进行测试时,代码 运行 没有问题。因此,我认为问题是函数 f
被“双重装饰”,但我不知道如何解决这个问题。
import multiprocessing as mp
from multiprocessing.queues import Queue
from typing import Callable
from functools import wraps
def func_wrapper(f: Callable) -> Callable:
@wraps(f)
def q_f(q_in, q_out):
while not q_in.empty():
args = q_in.get()
out = f(*args)
q_out.put(out)
return
return q_f
def process_wrapper(f: Callable, num_process: int = 10) -> Callable:
f = func_wrapper(f)
@wraps(f)
def ff(ls) -> list:
manager = mp.Manager()
q_in = manager.Queue()
q_out = manager.Queue()
[q_in.put(x) for x in ls]
# queue_func = f
processes = []
for _ in range(num_process):
p = mp.Process(target = f, args = (q_in, q_out))
processes.append(p)
p.start()
for p in processes:
p.join()
output = []
while not q_out.empty():
output.append(q_out.get())
return output
return ff
if __name__ == "__main__":
def f(x):
print(x)
f = process_wrapper(f)
list_args = [(x,) for x in range(100)]
f(list_args)
我最终使用了 multiprocess
而不是 multiprocessing
,它解决了问题。它与序列化 pickle
与 dill
的限制有关