多处理池没有获得所有核心
Multiprocessing Pool not getting all cores
我有一个代码需要一些并行化,为此我使用了 Python 的 multiprocessing
模块,特别是 Pool
class。发生并行化的代码的相关部分看起来像这样
import multiprocessing as mp
import numpy as np
@jit( nopython=True )
def numba_product( a, b ):
a_len = len(a)
b_len = len(b)
n = len( a[0,:] )
c_res = np.empty( ( a_len*b_len, n ), dtype=np.complex128 )
c_count = 0
for i in range(a_len):
for j in range( b_len ):
c_res[ c_count , : ] = np.multiply( a[i,:], b[ j, : ] )
c_count += 1
return c_res
def do_some_computations( shared_object, index ):
d = shared_object.get_dictionary_1()
some_numpy_array_1 = shared_object.get_numpy_array_1( index ) #this gets a numpy array from
# shared object attribute, i.e.,
# from shared_object class
# definition, the method returns
# "self.the_array" attribute that
# belongs to shared object, see
# dummy version of class definition
# below
mask_array_1 = shared_object.get_mask_array_1() # this gets a mask for the specified array
filtered_array_1 = some_numpy_array_1[ mask_array_1] #note that this defines a local new array,
# but shouldn't modify some_numpy_array_1
# ( I believe )
s_keys = shared_object.get_keys_for_index( index ) #gets the keys corresponding to
#that index to create a new array
v = np.array( [ d1[ x ] for x in s_keys ] )
final_result = numba_product( filtered_array_1, v ) #
def pool_worker_function( index, args ):
shared_object = args[0]
result = do_some_computations( shared_object, index )
return result
def parallel_exec( shared_object, N ):
number_processors = mp.cpu_count()
number_used_processors = number_processors - 1
#try pool map method with a READ-ONLY object that is "shared_object".
# This object contains two big dictionaries from which values are retrieved,
# and various NumPy arrays of considerable dimension
from itertools import repeat
pool = mp.Pool( processes = number_used_processors )
a = list( np.linspace( 0, N, N ) )
args = ( shared_object, )
number_tasks = number_used_processors
n_chunck = int( ( len(a) - 1 )/number_tasks )
result = pool.starmap( pool_worker_function, zip( a, repeat( args ) ), chunksize = n_chunck)
pool.close()
pool.join()
return result
问题:
我遇到的问题是,当我在 Unix OS 下 运行 时,在一个 32 核系统上,我只观察到几个内核正在并行化......到目前为止据我了解,Unix 提供自动 os.fork()
作为 写时复制 ,这意味着如果我的 shared_object 在调用期间未被修改,并行化应该采取没有额外内存消耗的地方,所有内核应该分别执行它们的任务?这是程序到达并行化部分时我看到的快照:
这些让我感到困惑,我已经确定 cpu.count() 提供的内核总数是 32。我观察到的另一件事是,在整个并行化过程中,可用内存量不断减少从 ~84 GiB 到 ~59 GiB。这可能暗示每个进程正在创建“shared_object”class 的副本,因此制作 class 包含的所有字典和 NumPy 数组的副本。我想规避这个问题;我想使用所有内核进行并行化,但老实说我不知道这里发生了什么。
该代码预计在 32 核的 Unix 机器上 运行,但我自己的笔记本电脑 Windows OS,这是我在 OS 上看到的快照=47=] 当我启动它时(尽管据我所读,Windows 不支持 os.fork()
方法,所以我猜高内存消耗不足为奇?)。
如您所见,对 OS(红色)的调用占据了 CPU 使用率的很高百分比。在上面显示的 Linux 案例的快照中似乎也是这种情况。
最后,我想强调的是class“shared_object”有如下形式:
class shared_object():
def __init__(): pass
def store_dictionaries_and_arrays( dict_1, dict_2, array_1, array_2, ... ):
self.dict_1 = dict_1
self.dict_2 = dict_2
self.array_1 = array_1
# same for all other arguments passed
def get_dictionary_1():
return self.dict_1
def get_numpy_array_1():
return self.array_1
但对于更多的属性,因此有更多的“获取”方法。这是一个非常大的数据容器,因此我希望在执行并行化时没有它的副本,因为属性应该只被访问而不是被修改,我在这里错过了什么?非常感谢任何帮助,这已经困扰我很长时间了......非常感谢!
根据您的评论,我认为您只想做这样的事情:
def pool_worker_function(index, args):
return do_some_computations(_shared_hack, index)
def parallel_exec(shared_object, N):
global _shared_hack
_shared_hack = shared_object
# it'll use ncores processes by default
with mp.Pool() as pool:
return pool.map(pool_worker_function, range(N))
即将 shared_object
保存在全局某处,让子进程在需要时获取它。
你做了很多奇怪的设置,我已经去掉了,包括设置一个 chuncks
的列表,这些列表没有在任何地方使用过。我也切换到使用 range
因为你也使用 list(np.linspace(0, N, N))
来设置一些看起来坏了的索引。例如,N=4
会给你 [0, 1.333, 2.667, 4]
这看起来不像我想用
索引数组的东西
我有一个代码需要一些并行化,为此我使用了 Python 的 multiprocessing
模块,特别是 Pool
class。发生并行化的代码的相关部分看起来像这样
import multiprocessing as mp
import numpy as np
@jit( nopython=True )
def numba_product( a, b ):
a_len = len(a)
b_len = len(b)
n = len( a[0,:] )
c_res = np.empty( ( a_len*b_len, n ), dtype=np.complex128 )
c_count = 0
for i in range(a_len):
for j in range( b_len ):
c_res[ c_count , : ] = np.multiply( a[i,:], b[ j, : ] )
c_count += 1
return c_res
def do_some_computations( shared_object, index ):
d = shared_object.get_dictionary_1()
some_numpy_array_1 = shared_object.get_numpy_array_1( index ) #this gets a numpy array from
# shared object attribute, i.e.,
# from shared_object class
# definition, the method returns
# "self.the_array" attribute that
# belongs to shared object, see
# dummy version of class definition
# below
mask_array_1 = shared_object.get_mask_array_1() # this gets a mask for the specified array
filtered_array_1 = some_numpy_array_1[ mask_array_1] #note that this defines a local new array,
# but shouldn't modify some_numpy_array_1
# ( I believe )
s_keys = shared_object.get_keys_for_index( index ) #gets the keys corresponding to
#that index to create a new array
v = np.array( [ d1[ x ] for x in s_keys ] )
final_result = numba_product( filtered_array_1, v ) #
def pool_worker_function( index, args ):
shared_object = args[0]
result = do_some_computations( shared_object, index )
return result
def parallel_exec( shared_object, N ):
number_processors = mp.cpu_count()
number_used_processors = number_processors - 1
#try pool map method with a READ-ONLY object that is "shared_object".
# This object contains two big dictionaries from which values are retrieved,
# and various NumPy arrays of considerable dimension
from itertools import repeat
pool = mp.Pool( processes = number_used_processors )
a = list( np.linspace( 0, N, N ) )
args = ( shared_object, )
number_tasks = number_used_processors
n_chunck = int( ( len(a) - 1 )/number_tasks )
result = pool.starmap( pool_worker_function, zip( a, repeat( args ) ), chunksize = n_chunck)
pool.close()
pool.join()
return result
问题:
我遇到的问题是,当我在 Unix OS 下 运行 时,在一个 32 核系统上,我只观察到几个内核正在并行化......到目前为止据我了解,Unix 提供自动 os.fork()
作为 写时复制 ,这意味着如果我的 shared_object 在调用期间未被修改,并行化应该采取没有额外内存消耗的地方,所有内核应该分别执行它们的任务?这是程序到达并行化部分时我看到的快照:
这些让我感到困惑,我已经确定 cpu.count() 提供的内核总数是 32。我观察到的另一件事是,在整个并行化过程中,可用内存量不断减少从 ~84 GiB 到 ~59 GiB。这可能暗示每个进程正在创建“shared_object”class 的副本,因此制作 class 包含的所有字典和 NumPy 数组的副本。我想规避这个问题;我想使用所有内核进行并行化,但老实说我不知道这里发生了什么。
该代码预计在 32 核的 Unix 机器上 运行,但我自己的笔记本电脑 Windows OS,这是我在 OS 上看到的快照=47=] 当我启动它时(尽管据我所读,Windows 不支持 os.fork()
方法,所以我猜高内存消耗不足为奇?)。
如您所见,对 OS(红色)的调用占据了 CPU 使用率的很高百分比。在上面显示的 Linux 案例的快照中似乎也是这种情况。
最后,我想强调的是class“shared_object”有如下形式:
class shared_object():
def __init__(): pass
def store_dictionaries_and_arrays( dict_1, dict_2, array_1, array_2, ... ):
self.dict_1 = dict_1
self.dict_2 = dict_2
self.array_1 = array_1
# same for all other arguments passed
def get_dictionary_1():
return self.dict_1
def get_numpy_array_1():
return self.array_1
但对于更多的属性,因此有更多的“获取”方法。这是一个非常大的数据容器,因此我希望在执行并行化时没有它的副本,因为属性应该只被访问而不是被修改,我在这里错过了什么?非常感谢任何帮助,这已经困扰我很长时间了......非常感谢!
根据您的评论,我认为您只想做这样的事情:
def pool_worker_function(index, args):
return do_some_computations(_shared_hack, index)
def parallel_exec(shared_object, N):
global _shared_hack
_shared_hack = shared_object
# it'll use ncores processes by default
with mp.Pool() as pool:
return pool.map(pool_worker_function, range(N))
即将 shared_object
保存在全局某处,让子进程在需要时获取它。
你做了很多奇怪的设置,我已经去掉了,包括设置一个 chuncks
的列表,这些列表没有在任何地方使用过。我也切换到使用 range
因为你也使用 list(np.linspace(0, N, N))
来设置一些看起来坏了的索引。例如,N=4
会给你 [0, 1.333, 2.667, 4]
这看起来不像我想用