class 变量在多处理中变空 pool.map
class variable become empty in multiprocessing pool.map
我在 Utils class 中有一个 class 变量。
class Utils:
_raw_data = defaultdict(list)
@classmethod
def raw_data(cls):
return cls._raw_data.copy()
@classmethod
def set_raw_data(cls, key, data):
cls._raw_data[key] = data
_raw_data 在被读取之前填充了键值对。
...
data = [ipaddress.IPv4Network(address) for address in ip_addresses]
Utils.set_raw_data(device_name, data)
但是当我尝试在 multiprocessing Pool.map 中执行一个从 Utils class 读取 raw_data 的函数时,它 returns 空列表。
这是父方法class
class Parent:
...
def evaluate_without_prefix(self, devices):
results = []
print(Utils.raw_data()) <------ this print shows that the Utils.raw_data() is empty
for network1, network2 in itertools.product(Utils.raw_data()[devices[0]], Utils.raw_data()[devices[1]]):
if network1.subnet_of(network2):
results.append((devices[0], network1, devices[1], network2))
if network2.subnet_of(network1):
results.append((devices[1], network2, devices[0], network1))
return results
在子 class 中,我从父 class 执行方法,使用多处理池。
class Child(Parent):
...
def execute(self):
pool = Pool(os.cpu_count() - 1)
devices = list(itertools.combinations(list(Utils.raw_data().keys()), 2))
results = pool.map(super().evaluate_without_prefix, devices)
return results
Parentclass里的print()显示raw_data()是空的,但是这个变量其实是有数据的,Childclass里的devices变量其实是从raw_data() 但是当它进入多处理池时,raw_data() 变为空。有什么原因吗?
问题好像如下:
在你的主进程中创建的class数据必须serialized/de-serialized使用pickle
以便它可以从主进程的地址space传递到地址spaces 多处理池中需要使用这些对象的进程。但是有问题的 class 数据是 class Parent
的一个实例,因为您正在调用它的方法之一,即 valuate_without_prefix
。但是在那个实例中没有任何地方有对 class Util
的引用或任何会导致多处理池序列化 Util
class 和 Parent
的东西实例。因此,当该方法在任何进程中引用 class Util
时,将创建一个新的 Util
,当然,它不会初始化其字典。
我认为最简单的改变是:
- 使属性
_raw_data
成为 instance 属性而不是 class 属性(顺便说一下,根据您当前的使用情况,不需要这是 defaultdict
).
- 创建一个名为
util
的 class Util
实例并通过此引用初始化字典。
- 使用
multiprocessing.Pool
构造函数的 initializer 和 initargs 参数来初始化多处理池中的每个进程,使其具有名为 util
的全局变量将是主进程创建的 util
实例的副本。
所以我会按照以下几行来组织代码:
class Utils:
def __init__(self):
self._raw_data = {}
def raw_data(self):
# No need to make a copy ???
return self._raw_data.copy()
def set_raw_data(self, key, data):
self._raw_data[key] = data
def init_processes(utils_instance):
"""
Initialize each process in the process pool with global variable utils.
"""
global utils
utils = utils_instance
class Parent:
...
def evaluate_without_prefix(self, devices):
results = []
print(utils.raw_data())
for network1, network2 in itertools.product(utils.raw_data()[devices[0]], utils.raw_data()[devices[1]]):
results.append([network1, network2])
return results
class Child(Parent):
...
def execute(self, utils):
pool = Pool(os.cpu_count() - 1, initializer=init_processes, initargs=(utils,))
# No need to make an explicit list (map will do that for you) ???
devices = list(itertools.combinations(list(utils.raw_data().keys()), 2))
results = pool.map(super().evaluate_without_prefix, devices)
return results
def main():
utils = Utils()
# Initialize utils:
...
data = [ipaddress.IPv4Network(address) for address in ip_addresses]
utils.set_raw_data(device_name, data)
child = Child()
results = child.execute(utils)
if __name__ == '__main__':
main()
进一步说明
以下程序的主进程调用class方法Foo.set_x
更新class属性x
为10的值,然后创建多处理池并调用worker函数worker
,它打印出 Foo.x
.
的值
在 Windows 上,它使用 OS spawn
创建新进程,池中的进程在调用 worker 函数之前基本上通过启动新的 Python 解释器并重新执行源程序,在全局范围内执行每条语句。因此 Foo
的 class 定义是由编译它的 Python 解释器创建的;不涉及酸洗。但是 Foo.x
将是 0。
Linux 上的同一程序 运行,它使用 OS fork
创建新进程,继承了写时复制地址 space从主进程。因此,它将拥有 Foo
class 的副本,因为它在创建多处理池时存在,并且 Foo.x
将 为 10。
我上面的解决方案是 Windows 平台,也适用于 Linux。当然,另一种方法是将 Util
实例作为附加参数传递给辅助函数,而不是使用池初始化程序,但这通常效率不高,因为池中的进程数通常少于worker 函数被调用的次数,因此池初始化方法需要的 pickling 更少。
from multiprocessing import Pool
class Foo:
x = 0
@classmethod
def set_x(cls, x):
cls.x = x
def worker():
print(Foo.x)
if __name__ == '__main__':
Foo.set_x(10)
pool = Pool(1)
pool.apply(worker)
我在 Utils class 中有一个 class 变量。
class Utils:
_raw_data = defaultdict(list)
@classmethod
def raw_data(cls):
return cls._raw_data.copy()
@classmethod
def set_raw_data(cls, key, data):
cls._raw_data[key] = data
_raw_data 在被读取之前填充了键值对。
...
data = [ipaddress.IPv4Network(address) for address in ip_addresses]
Utils.set_raw_data(device_name, data)
但是当我尝试在 multiprocessing Pool.map 中执行一个从 Utils class 读取 raw_data 的函数时,它 returns 空列表。
这是父方法class
class Parent:
...
def evaluate_without_prefix(self, devices):
results = []
print(Utils.raw_data()) <------ this print shows that the Utils.raw_data() is empty
for network1, network2 in itertools.product(Utils.raw_data()[devices[0]], Utils.raw_data()[devices[1]]):
if network1.subnet_of(network2):
results.append((devices[0], network1, devices[1], network2))
if network2.subnet_of(network1):
results.append((devices[1], network2, devices[0], network1))
return results
在子 class 中,我从父 class 执行方法,使用多处理池。
class Child(Parent):
...
def execute(self):
pool = Pool(os.cpu_count() - 1)
devices = list(itertools.combinations(list(Utils.raw_data().keys()), 2))
results = pool.map(super().evaluate_without_prefix, devices)
return results
Parentclass里的print()显示raw_data()是空的,但是这个变量其实是有数据的,Childclass里的devices变量其实是从raw_data() 但是当它进入多处理池时,raw_data() 变为空。有什么原因吗?
问题好像如下:
在你的主进程中创建的class数据必须serialized/de-serialized使用pickle
以便它可以从主进程的地址space传递到地址spaces 多处理池中需要使用这些对象的进程。但是有问题的 class 数据是 class Parent
的一个实例,因为您正在调用它的方法之一,即 valuate_without_prefix
。但是在那个实例中没有任何地方有对 class Util
的引用或任何会导致多处理池序列化 Util
class 和 Parent
的东西实例。因此,当该方法在任何进程中引用 class Util
时,将创建一个新的 Util
,当然,它不会初始化其字典。
我认为最简单的改变是:
- 使属性
_raw_data
成为 instance 属性而不是 class 属性(顺便说一下,根据您当前的使用情况,不需要这是defaultdict
). - 创建一个名为
util
的 classUtil
实例并通过此引用初始化字典。 - 使用
multiprocessing.Pool
构造函数的 initializer 和 initargs 参数来初始化多处理池中的每个进程,使其具有名为util
的全局变量将是主进程创建的util
实例的副本。
所以我会按照以下几行来组织代码:
class Utils:
def __init__(self):
self._raw_data = {}
def raw_data(self):
# No need to make a copy ???
return self._raw_data.copy()
def set_raw_data(self, key, data):
self._raw_data[key] = data
def init_processes(utils_instance):
"""
Initialize each process in the process pool with global variable utils.
"""
global utils
utils = utils_instance
class Parent:
...
def evaluate_without_prefix(self, devices):
results = []
print(utils.raw_data())
for network1, network2 in itertools.product(utils.raw_data()[devices[0]], utils.raw_data()[devices[1]]):
results.append([network1, network2])
return results
class Child(Parent):
...
def execute(self, utils):
pool = Pool(os.cpu_count() - 1, initializer=init_processes, initargs=(utils,))
# No need to make an explicit list (map will do that for you) ???
devices = list(itertools.combinations(list(utils.raw_data().keys()), 2))
results = pool.map(super().evaluate_without_prefix, devices)
return results
def main():
utils = Utils()
# Initialize utils:
...
data = [ipaddress.IPv4Network(address) for address in ip_addresses]
utils.set_raw_data(device_name, data)
child = Child()
results = child.execute(utils)
if __name__ == '__main__':
main()
进一步说明
以下程序的主进程调用class方法Foo.set_x
更新class属性x
为10的值,然后创建多处理池并调用worker函数worker
,它打印出 Foo.x
.
在 Windows 上,它使用 OS spawn
创建新进程,池中的进程在调用 worker 函数之前基本上通过启动新的 Python 解释器并重新执行源程序,在全局范围内执行每条语句。因此 Foo
的 class 定义是由编译它的 Python 解释器创建的;不涉及酸洗。但是 Foo.x
将是 0。
Linux 上的同一程序 运行,它使用 OS fork
创建新进程,继承了写时复制地址 space从主进程。因此,它将拥有 Foo
class 的副本,因为它在创建多处理池时存在,并且 Foo.x
将 为 10。
我上面的解决方案是 Windows 平台,也适用于 Linux。当然,另一种方法是将 Util
实例作为附加参数传递给辅助函数,而不是使用池初始化程序,但这通常效率不高,因为池中的进程数通常少于worker 函数被调用的次数,因此池初始化方法需要的 pickling 更少。
from multiprocessing import Pool
class Foo:
x = 0
@classmethod
def set_x(cls, x):
cls.x = x
def worker():
print(Foo.x)
if __name__ == '__main__':
Foo.set_x(10)
pool = Pool(1)
pool.apply(worker)