class 变量在多处理中变空 pool.map

class variable become empty in multiprocessing pool.map

我在 Utils class 中有一个 class 变量。

class Utils:
    _raw_data = defaultdict(list)

    @classmethod
    def raw_data(cls):
        return cls._raw_data.copy()

    @classmethod
    def set_raw_data(cls, key, data):
        cls._raw_data[key] = data

_raw_data 在被读取之前填充了键值对。

...
data = [ipaddress.IPv4Network(address) for address in ip_addresses]
Utils.set_raw_data(device_name, data)

但是当我尝试在 multiprocessing Pool.map 中执行一个从 Utils class 读取 raw_data 的函数时,它 returns 空列表。

这是父方法class

class Parent:
    ...
    def evaluate_without_prefix(self, devices):
        results = []
        print(Utils.raw_data())  <------ this print shows that the Utils.raw_data() is empty
        for network1, network2 in itertools.product(Utils.raw_data()[devices[0]], Utils.raw_data()[devices[1]]):
            if network1.subnet_of(network2):
                results.append((devices[0], network1, devices[1], network2))
            if network2.subnet_of(network1):
                results.append((devices[1], network2, devices[0], network1))

        return results

在子 class 中,我从父 class 执行方法,使用多处理池。

class Child(Parent):
    ...
    def execute(self):
        pool = Pool(os.cpu_count() - 1)
        devices = list(itertools.combinations(list(Utils.raw_data().keys()), 2))
        results = pool.map(super().evaluate_without_prefix, devices)
        return results

Parentclass里的print()显示raw_data()是空的,但是这个变量其实是有数据的,Childclass里的devices变量其实是从raw_data() 但是当它进入多处理池时,raw_data() 变为空。有什么原因吗?

问题好像如下:

在你的主进程中创建的class数据必须serialized/de-serialized使用pickle以便它可以从主进程的地址space传递到地址spaces 多处理池中需要使用这些对象的进程。但是有问题的 class 数据是 class Parent 的一个实例,因为您正在调用它的方法之一,即 valuate_without_prefix。但是在那个实例中没有任何地方有对 class Util 的引用或任何会导致多处理池序列化 Util class 和 Parent 的东西实例。因此,当该方法在任何进程中引用 class Util 时,将创建一个新的 Util,当然,它不会初始化其字典。

我认为最简单的改变是:

  1. 使属性 _raw_data 成为 instance 属性而不是 class 属性(顺便说一下,根据您当前的使用情况,不需要这是 defaultdict).
  2. 创建一个名为 util 的 class Util 实例并通过此引用初始化字典。
  3. 使用 multiprocessing.Pool 构造函数的 initializerinitargs 参数来初始化多处理池中的每个进程,使其具有名为 util 的全局变量将是主进程创建的 util 实例的副本。

所以我会按照以下几行来组织代码:

class Utils:

    def __init__(self):
        self._raw_data = {}

    def raw_data(self):
        # No need to make a copy ???
        return self._raw_data.copy()

    def set_raw_data(self, key, data):
        self._raw_data[key] = data

def init_processes(utils_instance):
    """
    Initialize each process in the process pool with global variable utils.
    """
    global utils
    utils = utils_instance

class Parent:
    ...
    def evaluate_without_prefix(self, devices):
        results = []
        print(utils.raw_data())
        for network1, network2 in itertools.product(utils.raw_data()[devices[0]], utils.raw_data()[devices[1]]):
            results.append([network1, network2])
        return results

class Child(Parent):
    ...
    def execute(self, utils):
        pool = Pool(os.cpu_count() - 1, initializer=init_processes, initargs=(utils,))
        # No need to make an explicit list (map will do that for you) ???
        devices = list(itertools.combinations(list(utils.raw_data().keys()), 2))
        results = pool.map(super().evaluate_without_prefix, devices)
        return results

def main():
    utils = Utils()
    # Initialize utils:   
    ...
    data = [ipaddress.IPv4Network(address) for address in ip_addresses]
    utils.set_raw_data(device_name, data)

    child = Child()
    results = child.execute(utils)

if __name__ == '__main__':
    main()

进一步说明

以下程序的主进程调用class方法Foo.set_x更新class属性x为10的值,然后创建多处理池并调用worker函数worker,它打印出 Foo.x.

的值

在 Windows 上,它使用 OS spawn 创建新进程,池中的进程在调用 worker 函数之前基本上通过启动新的 Python 解释器并重新执行源程序,在全局范围内执行每条语句。因此 Foo 的 class 定义是由编译它的 Python 解释器创建的;不涉及酸洗。但是 Foo.x 将是 0。

Linux 上的同一程序 运行,它使用 OS fork 创建新进程,继承了写时复制地址 space从主进程。因此,它将拥有 Foo class 的副本,因为它在创建多处理池时存在,并且 Foo.x 为 10。

我上面的解决方案是 Windows 平台,也适用于 Linux。当然,另一种方法是将 Util 实例作为附加参数传递给辅助函数,而不是使用池初始化程序,但这通常效率不高,因为池中的进程数通常少于worker 函数被调用的次数,因此池初始化方法需要的 pickling 更少。

from multiprocessing import Pool

class Foo:
    x = 0

    @classmethod
    def set_x(cls, x):
        cls.x = x

def worker():
    print(Foo.x)


if __name__ == '__main__':
    Foo.set_x(10)
    pool = Pool(1)
    pool.apply(worker)