使用嵌套多处理有效保存实例属性 Pools/Processes

Effectively save instance attribute with nested multiprocessing Pools/Processes

我有两个自定义 Python classes,第一个有一个方法来进行一些计算(使用 Pool)并创建一个新的实例属性,第二个用于聚合第一个 class 的两个对象,并且有一个方法,我想用它在两个第一个 class 对象中发送所述计算(也是并行的)并正确保存它们的新实例属性。

虚拟代码:

from multiprocessing import Pool, Process

class State:
    def __init__(self, data):
        self.data = data

    def calculate(self):
        with Pool() as p:
            p.map(function, args)
        new_attribute = *some code that reads the files generated with the Pool*
        self.new_attribute = new_attribute
        return


class Pair:
    def __init__(self. state1:State, state2:State):
        self.state1 = state1
        self.state2 = state2

    def calculate_states(self):
        for state in [self.state1, self.state2]
            p = Process(state.calculate, args)
            p.start()
        return




state1 = State(data1)
state2 = State(data2)
pair = Pair(state1, state2)
pair.calculate_states()

问题是,正如我在对该问题进行广泛研究期间发现的那样,multiprocessing.Process 创建了进程在其中工作的命名空间的副本,并且这些值没有返回到主命名空间.将 process.daemon 设置为 True 会产生错误,因为“不允许守护进程有子进程”,这与我用额外的池交换进程时发生的事情是一样的。使用多进程(而不是多进程)或 concurrent.futures 似乎也不起作用。此外,我不明白 multiprocessing.Queue 是如何工作的,我不确定它是否可以在这里应用(我在某处读到它可以使用)。

我想做我想做的事,而不必将共享内存对象传递给进程(将 new_attribute 写入其中,然后将其应用于主命名空间中的状态).即使我没有提供有效的 code/reproducible 示例,我也希望有人能指出我的解决方案。

您的问题是由于调用方法 calculate 作为新的子进程引起的。您仍然可以通过使用带有 callback 参数的 map_async 并行计算新属性,而无需这样做。

我已经获取了您的代码并提供了缺少的函数实现来演示:

from multiprocessing import Pool, cpu_count

def some_code(data):
    if data == 1:
        return 1032
    if data == 2:
        return 9874
    raise ValueError('Invalid data value:', data)

def function(val):
    ...
    # return value is not of interest

class State:
    def __init__(self, data):
        self.data = data

    def calculate(self, pool, args):
        pool.map_async(function, args, callback=self.callback)

    def callback(self, result):
        """
        Called when map_async completes
        """
        new_attribute = some_code(self.data)
        self.new_attribute = new_attribute


class Pair:
    def __init__(self, state1:State, state2:State):
        self.state1 = state1
        self.state2 = state2

    def calculate_states(self):
        args = (6, 9, 18)
        # Assumption is computation is VERY CPU-intensive
        # If there is quite a bit of I/O involved then: pool_size = 2 * len(args)
        # If it's mostly I/O you should have been using multithreading to begin with
        pool_size = min(2*len(args), cpu_count())
        with Pool(pool_size) as pool:
            for state in [self.state1, self.state2]:
                state.calculate(pool, args)
            # wait for tasks to complete
            pool.close()
            pool.join()


# Required for Windows:
if __name__ == '__main__':
    data1 = 1
    data2 = 2

    state1 = State(data1)
    state2 = State(data2)
    pair = Pair(state1, state2)
    pair.calculate_states()
    print(state1.new_attribute, state2.new_attribute)

打印:

1032 9874