为多处理酸洗双重装饰功能

Pickling doubly-decorated functions for Multiprocessing

我正在尝试编写一些样板代码来简化常见的工作流程: 给定一些接受参数的函数 f 和这些参数的列表,return 一个创建队列和进程、传递工作并最终将队列中的结果收集到列表中的新函数

我意识到这与 map 所做的非常相似,但这应该更灵活(尽管与 MWE 无关。

问题是 pickle 修饰函数不起作用,即使在我使用 functools.wraps 之后也是如此。

当我通过传入函数的“排队”版本而不是在 process_wrapper 中创建它进行测试时,代码 运行 没有问题。因此,我认为问题是函数 f 被“双重装饰”,但我不知道如何解决这个问题。

import multiprocessing as mp
from multiprocessing.queues import Queue
from typing import Callable
from functools import wraps

def func_wrapper(f: Callable) -> Callable:
    @wraps(f)
    def q_f(q_in, q_out):
        while not q_in.empty():
            args = q_in.get()
            out = f(*args)
            q_out.put(out)
        return
    return q_f

def process_wrapper(f: Callable, num_process: int = 10) -> Callable:

    f = func_wrapper(f)
    @wraps(f)
    def ff(ls) -> list:
        manager = mp.Manager()
        q_in = manager.Queue()
        q_out = manager.Queue()

        [q_in.put(x) for x in ls]

        # queue_func = f

        processes = []
        for _ in range(num_process):
            p = mp.Process(target = f, args = (q_in, q_out))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        output = []
        while not q_out.empty():
            output.append(q_out.get())

        return output
    return ff

if __name__ == "__main__":

    def f(x):
        print(x)
    f = process_wrapper(f)
        

    list_args = [(x,) for x in range(100)]

    f(list_args)

我最终使用了 multiprocess 而不是 multiprocessing,它解决了问题。它与序列化 pickledill 的限制有关