利用 "Copy-on-Write" 将数据复制到 Multiprocessing.Pool() 工作进程
Leveraging "Copy-on-Write" to Copy Data to Multiprocessing.Pool() Worker Processes
我有一些 multiprocessing
Python 代码看起来有点像这样:
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = np.zeros(100000000) # basically a big memory struct
def my_multithreaded_analysis(self):
arg_lists = [(self, i) for i in range(10)]
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
if __name__ == '__main__':
my_instance = MyClass()
my_instance.my_multithreaded_analysis()
在阅读了其他 Whosebug 答案中关于内存如何工作的答案后,例如这个 Python multiprocessing memory usage 我的印象是,这不会按照我用于多处理的进程数的比例使用内存,因为它是写时复制,我没有修改 my_instance
的任何属性。但是,当我 运行 top 时,我确实看到所有进程的高内存,它说我的大部分进程都在使用大量内存(这是 OSX 的最高输出,但我可以在 [=25] 上复制=]).
我的问题基本上是,我的解释是否正确,因为我的 MyClass
实例实际上在池中重复了?如果是这样,我该如何防止这种情况发生?我不应该使用这样的结构吗?我的目标是减少计算分析的内存使用。
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP PPID STATE
2494 Python 0.0 00:01.75 1 0 7 765M 0B 0B 2484 2484 sleeping
2493 Python 0.0 00:01.85 1 0 7 765M 0B 0B 2484 2484 sleeping
2492 Python 0.0 00:01.86 1 0 7 765M 0B 0B 2484 2484 sleeping
2491 Python 0.0 00:01.83 1 0 7 765M 0B 0B 2484 2484 sleeping
2490 Python 0.0 00:01.87 1 0 7 765M 0B 0B 2484 2484 sleeping
2489 Python 0.0 00:01.79 1 0 7 167M 0B 597M 2484 2484 sleeping
2488 Python 0.0 00:01.77 1 0 7 10M 0B 755M 2484 2484 sleeping
2487 Python 0.0 00:01.75 1 0 7 8724K 0B 756M 2484 2484 sleeping
2486 Python 0.0 00:01.78 1 0 7 9968K 0B 755M 2484 2484 sleeping
2485 Python 0.0 00:01.74 1 0 7 171M 0B 594M 2484 2484 sleeping
2484 Python 0.1 00:16.43 4 0 18 775M 0B 12K 2484 2235 sleeping
发送到 pool.map
的任何内容(以及相关方法)实际上并未使用共享的 copy-on-write 资源。这些值是 "pickled" (Python's serialization mechanism),通过管道发送到工作进程并在那里解开,它从头开始在 child 中重建 object。因此,在这种情况下,每个 child 最终都会得到原始数据的 copy-on-write 版本(它从未使用过,因为它被告知使用通过 IPC 发送的副本),以及个人对child 中重建的原始数据,未共享。
如果您想利用分叉的 copy-on-write 好处,您不能通过管道发送数据(或 object 引用数据)。您必须将它们存储在可以通过访问它们自己的全局变量从 child 中找到的位置。例如:
import os
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = os.urandom(1024*1024*1024) # basically a big memory struct(~1GB size)
def my_multithreaded_analysis(self):
arg_lists = list(range(10)) # Don't pass self
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(i):
# Implicitly use global copy of my_instance, not one passed as an argument
return my_instance.analyze(i)
# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()
if __name__ == '__main__':
my_instance.my_multithreaded_analysis()
通过不传递 self
,您可以避免复制,只需使用 copy-on-write 映射到 child 的单个全局 object。如果您需要多个 object,您可以在创建池之前将全局 list
或 dict
映射到 object 的实例,然后传递索引或键可以查找 object 作为 pool.map
参数的一部分。然后 worker 函数使用 index/key(必须被 pickle 并通过 IPC 发送到 child)在全局字典(也 copy-on-write 映射),因此您可以复制便宜的信息以在 child 中查找昂贵的数据而不复制它。
如果 object 太小,即使您不给它们写信,它们最终也会被复制。 CPython是引用计数的,引用计数出现在commonobjectheader中,并且不断更新,只要引用object,即使是逻辑上的non-mutating引用.如此小的 objects(以及分配在同一内存页中的所有其他 objects)将被写入,并因此被复制。对于大型 objects(您的亿元素 numpy 数组),只要您不写入它,大部分内容将保持共享状态,因为 header 仅占用许多页面之一
在 python 3.8 版中更改:在 macOS 上,spawn 启动方法现在是默认方法。参见 mulitprocessing doc。 Spawn 没有利用 copy-on-write.
或者,要利用分叉的写时复制优势,同时保留一些封装的外观,您可以 leverage class-attributes and @classmethods over pure globals
。
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
myAttribute = np.zeros(100000000) # basically a big memory struct
# myAttribute is a class-attribute
@classmethod
def my_multithreaded_analysis(cls):
arg_list = [i for i in range(10)]
pool = Pool(processes=10)
result = pool.map(analyze, arg_list)
print result
@classmethod
def analyze(cls, i):
time.sleep(10)
# If you wanted, you could access cls.myAttribute w/o worry here.
return i ** 2
""" We don't need this proxy step !
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
"""
if __name__ == '__main__':
my_instance = MyClass()
# Note that now you can instantiate MyClass anywhere in your app,
# While still taking advantage of copy-on-write forking
my_instance.my_multithreaded_analysis()
注1:是的,我承认class-attributes
和class-methods
是美化的全局变量。但它购买了一些封装......
注意 2: 与其明确创建上面的 arg_lists
,您可以 implicitly pass the instance (self) [=15] 创建的每个任务=],通过将绑定实例方法 analyze(self)
传递给 Pool.map()
,搬起石头砸自己的脚更容易!
我有一些 multiprocessing
Python 代码看起来有点像这样:
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = np.zeros(100000000) # basically a big memory struct
def my_multithreaded_analysis(self):
arg_lists = [(self, i) for i in range(10)]
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
if __name__ == '__main__':
my_instance = MyClass()
my_instance.my_multithreaded_analysis()
在阅读了其他 Whosebug 答案中关于内存如何工作的答案后,例如这个 Python multiprocessing memory usage 我的印象是,这不会按照我用于多处理的进程数的比例使用内存,因为它是写时复制,我没有修改 my_instance
的任何属性。但是,当我 运行 top 时,我确实看到所有进程的高内存,它说我的大部分进程都在使用大量内存(这是 OSX 的最高输出,但我可以在 [=25] 上复制=]).
我的问题基本上是,我的解释是否正确,因为我的 MyClass
实例实际上在池中重复了?如果是这样,我该如何防止这种情况发生?我不应该使用这样的结构吗?我的目标是减少计算分析的内存使用。
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP PPID STATE
2494 Python 0.0 00:01.75 1 0 7 765M 0B 0B 2484 2484 sleeping
2493 Python 0.0 00:01.85 1 0 7 765M 0B 0B 2484 2484 sleeping
2492 Python 0.0 00:01.86 1 0 7 765M 0B 0B 2484 2484 sleeping
2491 Python 0.0 00:01.83 1 0 7 765M 0B 0B 2484 2484 sleeping
2490 Python 0.0 00:01.87 1 0 7 765M 0B 0B 2484 2484 sleeping
2489 Python 0.0 00:01.79 1 0 7 167M 0B 597M 2484 2484 sleeping
2488 Python 0.0 00:01.77 1 0 7 10M 0B 755M 2484 2484 sleeping
2487 Python 0.0 00:01.75 1 0 7 8724K 0B 756M 2484 2484 sleeping
2486 Python 0.0 00:01.78 1 0 7 9968K 0B 755M 2484 2484 sleeping
2485 Python 0.0 00:01.74 1 0 7 171M 0B 594M 2484 2484 sleeping
2484 Python 0.1 00:16.43 4 0 18 775M 0B 12K 2484 2235 sleeping
发送到 pool.map
的任何内容(以及相关方法)实际上并未使用共享的 copy-on-write 资源。这些值是 "pickled" (Python's serialization mechanism),通过管道发送到工作进程并在那里解开,它从头开始在 child 中重建 object。因此,在这种情况下,每个 child 最终都会得到原始数据的 copy-on-write 版本(它从未使用过,因为它被告知使用通过 IPC 发送的副本),以及个人对child 中重建的原始数据,未共享。
如果您想利用分叉的 copy-on-write 好处,您不能通过管道发送数据(或 object 引用数据)。您必须将它们存储在可以通过访问它们自己的全局变量从 child 中找到的位置。例如:
import os
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = os.urandom(1024*1024*1024) # basically a big memory struct(~1GB size)
def my_multithreaded_analysis(self):
arg_lists = list(range(10)) # Don't pass self
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(i):
# Implicitly use global copy of my_instance, not one passed as an argument
return my_instance.analyze(i)
# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()
if __name__ == '__main__':
my_instance.my_multithreaded_analysis()
通过不传递 self
,您可以避免复制,只需使用 copy-on-write 映射到 child 的单个全局 object。如果您需要多个 object,您可以在创建池之前将全局 list
或 dict
映射到 object 的实例,然后传递索引或键可以查找 object 作为 pool.map
参数的一部分。然后 worker 函数使用 index/key(必须被 pickle 并通过 IPC 发送到 child)在全局字典(也 copy-on-write 映射),因此您可以复制便宜的信息以在 child 中查找昂贵的数据而不复制它。
如果 object 太小,即使您不给它们写信,它们最终也会被复制。 CPython是引用计数的,引用计数出现在commonobjectheader中,并且不断更新,只要引用object,即使是逻辑上的non-mutating引用.如此小的 objects(以及分配在同一内存页中的所有其他 objects)将被写入,并因此被复制。对于大型 objects(您的亿元素 numpy 数组),只要您不写入它,大部分内容将保持共享状态,因为 header 仅占用许多页面之一
在 python 3.8 版中更改:在 macOS 上,spawn 启动方法现在是默认方法。参见 mulitprocessing doc。 Spawn 没有利用 copy-on-write.
或者,要利用分叉的写时复制优势,同时保留一些封装的外观,您可以 leverage class-attributes and @classmethods over pure globals
。
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
myAttribute = np.zeros(100000000) # basically a big memory struct
# myAttribute is a class-attribute
@classmethod
def my_multithreaded_analysis(cls):
arg_list = [i for i in range(10)]
pool = Pool(processes=10)
result = pool.map(analyze, arg_list)
print result
@classmethod
def analyze(cls, i):
time.sleep(10)
# If you wanted, you could access cls.myAttribute w/o worry here.
return i ** 2
""" We don't need this proxy step !
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
"""
if __name__ == '__main__':
my_instance = MyClass()
# Note that now you can instantiate MyClass anywhere in your app,
# While still taking advantage of copy-on-write forking
my_instance.my_multithreaded_analysis()
注1:是的,我承认class-attributes
和class-methods
是美化的全局变量。但它购买了一些封装......
注意 2: 与其明确创建上面的 arg_lists
,您可以 implicitly pass the instance (self) [=15] 创建的每个任务=],通过将绑定实例方法 analyze(self)
传递给 Pool.map()
,搬起石头砸自己的脚更容易!