共享数据集和多个参数上的 ProcessPoolExecutor
ProcessPoolExecutor on shared dataset and multiple arguments
我遇到了一个问题,我无法通过在网络上进行一些搜索来解决。
我正在使用下面的最少代码。目标是通过多处理(使用 ProcessPoolExecutor)运行 某些函数 'f_sum' 几百万次。我通过元组列表 'args' 添加多个参数。此外,该函数应该使用某种对所有执行都相同的数据(在示例中它只是一个数字)。出于内存原因,我不想将数据添加到 'args' 元组。
目前我发现的唯一选择是在“if name == 'main'”之外添加数据。这将使(出于某种我不明白的原因)变量可用于所有进程。但是,更新是不可能的。另外,我真的不想在外部定义数据,因为在实际代码中它将基于数据导入,可能需要额外的操作。
希望您能提供帮助并提前致谢!
PS:我在 Win 10 上使用 Python 3.7.9。
from concurrent.futures import ProcessPoolExecutor
import numpy as np
data = 0 # supposed to be a large data set & shared among all calculations)
num_workers = 6 # number of CPU cores
num_iterations = 10 # supposed to be large number
def f_sum(args):
(x,y) = args
print('This is process', x, 'with exponent:', y)
value = 0
for i in range(10**y):
value += i
return value/10**y + data
def multiprocessing(func, args, workers):
with ProcessPoolExecutor(workers) as executor:
results = executor.map(func, args)
return list(results)
if __name__ == '__main__':
data = 0.5 # try to update data, should not be part of 'args' due to memory
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,8)))
result = multiprocessing(f_sum, args, num_workers)
if np.abs(result[0]-np.round(result[0])) > 0:
print('data NOT updated')
编辑原始问题:
>> 性能示例 1
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import time
data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
data = np.random.randint(0,100,size=data_size)
# data = np.linspace(0,data_size,data_size+1, dtype=np.uintc)
def f_sum(args):
(x,y) = args
print('This is process', x, 'random number:', y, 'last data', data[-1])
value = 0
for i in range(num_sum):
value += i
result = value - num_sum*(num_sum-1)/2 + data[-1]
return result
def multiprocessing(func, args, workers):
with ProcessPoolExecutor(workers) as executor:
results = executor.map(func, args)
return list(results)
if __name__ == '__main__':
t0 = time.time()
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,10)))
result = multiprocessing(f_sum, args, num_workers)
print(f'expected result: {data[-1]}, actual result: {np.unique(result)}')
t1 = time.time()
print(f'total time: {t1-t0}')
>>输出
This is process 99 random number: 6 last data 9
expected result: 86, actual result: [ 3. 9. 29. 58.]
total time: 11.760863542556763
如果使用 randint 会导致错误的结果。对于 linspace 结果是正确的。
>> 性能示例 2 - 基于答案中的建议
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time
data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
input = np.random.randint(0, 100, size=data_size)
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)
def f_sum(args):
(x,y) = args
print('This is process', x, 'random number:', y, 'last data', data[-1])
value = 0
for i in range(num_sum):
value += i
result = value - num_sum*(num_sum-1)/2 + data[-1]
return result
def init_pool(the_data):
global data
data = the_data
def multiprocessing(func, args, workers, input):
data = Array('i', input, lock=False)
with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
results = list(executor.map(func, args))
return results
if __name__ == '__main__':
t0 = time.time()
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,10)))
result = multiprocessing(f_sum, args, num_workers, input)
print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
t1 = time.time()
print(f'total time: {t1-t0}')
>>输出
This is process 99 random number: 7 last data 29
expected result: 29, actual result: [29.]
total time: 30.8266122341156
@Booboo
我在原来的问题中添加了两个示例,“性能示例 2”基于您的代码。第一个有趣的发现是,如果数据数组是用随机整数初始化的,我的原始代码实际上会给出不正确的结果。我注意到,每个进程本身都会初始化数据数组。由于它是基于随机数的,因此每个进程使用不同的数组进行计算,甚至与 main 不同。因此该用例不适用于此代码,在您的代码中它始终是正确的。
但是,如果使用 linspace,它会起作用,因为每次都会给出相同的结果。对于从文件中读取某些数据的用例(这是我的实际用例)也是如此。示例 1 仍然比示例 2 快 3 倍,我认为时间主要用于您方法中数组的初始化。
关于内存使用,我在我的任务管理器中没有发现相关差异。即使形状不同,这两个示例都会产生类似的内存增加。
我仍然相信你的方法是正确的方法,但是,在上面的例子中内存使用似乎相似并且速度较慢。
最有效的内存使用方式是使用共享内存,以便所有进程都在 data
的同一个实例上工作。如果进程更新 data
,这将是绝对必要的。在下面的示例中,由于对 data
的访问是只读的,并且我使用的是一个简单的整数数组,因此我使用 multiprocessing.Array
且未指定锁定。 “技巧”是通过指定 initializer
和 initargs
参数来初始化池,以便池中的每个进程都可以访问此共享内存。我对代码做了一些其他更改,我已经评论了
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array, cpu_count # new imports
def init_pool(the_data):
global data
data = the_data
def f_sum(args):
(x,y) = args
print('This is process', x, 'with exponent:', y)
value = 0
for i in range(10**y):
value += i
return value/10**y + len(data) # just use the length of data for now
def multiprocessing(func, args, workers):
data = Array('i', range(1000), lock=False) # read-only, integers 0, 1, 2, ... 999
with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
results = list(executor.map(func, args)) # create the list of results here
print(results) # so that it can be printed out for demo purposes
return results
if __name__ == '__main__':
num_iterations = 10 # supposed to be large number
#num_workers = 6 # number of CPU cores
num_workers = cpu_count() # number of CPU cores
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,8)))
result = multiprocessing(f_sum, args, num_workers)
if np.abs(result[0]-np.round(result[0])) > 0:
print('data NOT updated')
打印:
This is process 0 with exponent: 2
This is process 1 with exponent: 1
This is process 2 with exponent: 4
This is process 3 with exponent: 3
This is process 4 with exponent: 5
This is process 5 with exponent: 1
This is process 6 with exponent: 5
This is process 7 with exponent: 2
This is process 8 with exponent: 6
This is process 9 with exponent: 6
[1049.5, 1004.5, 5999.5, 1499.5, 50999.5, 1004.5, 50999.5, 1049.5, 500999.5, 500999.5]
data NOT updated
更新示例 2
你看到了我对你关于示例 1 的问题的评论。
您的示例 2 仍然不理想:您将语句 input = np.random.randint(0, 100, size=data_size)
作为每个进程不必要地执行的全局语句,因为它被初始化以用于进程池。下面是一个更新的解决方案,它还展示了一种方法,可以让您的工作函数直接与备份 multiprocessing.Array
实例的 numpy
数组一起工作,以便 numpy
数组存在于共享中记忆。您不必将此技术用于您正在做的事情,因为您仅使用 numpy
来创建随机数(我不确定为什么),但这是一种有用的技术。但是你应该在移动 input
的初始化代码后重新运行你的代码,因为它只执行一次。
我没有机会每天使用 numpy
,但我了解到它在内部使用多处理来实现其自身的许多功能。因此,它通常不是与多处理一起使用的最佳匹配,尽管这似乎不适用于此处,因为即使在下面的情况下,我们也只是索引数组的一个元素,它不会使用子进程来完成它.
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time
import ctypes
data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)
def to_shared_array(arr, ctype):
shared_array = Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def f_sum(args):
(x,y) = args
print('This is process', x, 'random number:', y, 'last data', data[-1])
value = 0
for i in range(num_sum):
value += i
result = value - num_sum*(num_sum-1)/2 + data[-1]
return result
def init_pool(shared_array, shape):
global data
data = to_numpy_array(shared_array, shape)
def multiprocessing(func, args, workers, input):
input = np.random.randint(0, 100, size=data_size)
shape = input.shape
shared_array = to_shared_array(input, ctypes.c_long)
with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(shared_array, shape)) as executor:
results = list(executor.map(func, args))
return input, results
if __name__ == '__main__':
t0 = time.time()
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,10)))
input, result = multiprocessing(f_sum, args, num_workers, input)
print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
t1 = time.time()
print(f'total time: {t1-t0}')
我遇到了一个问题,我无法通过在网络上进行一些搜索来解决。
我正在使用下面的最少代码。目标是通过多处理(使用 ProcessPoolExecutor)运行 某些函数 'f_sum' 几百万次。我通过元组列表 'args' 添加多个参数。此外,该函数应该使用某种对所有执行都相同的数据(在示例中它只是一个数字)。出于内存原因,我不想将数据添加到 'args' 元组。
目前我发现的唯一选择是在“if name == 'main'”之外添加数据。这将使(出于某种我不明白的原因)变量可用于所有进程。但是,更新是不可能的。另外,我真的不想在外部定义数据,因为在实际代码中它将基于数据导入,可能需要额外的操作。
希望您能提供帮助并提前致谢!
PS:我在 Win 10 上使用 Python 3.7.9。
from concurrent.futures import ProcessPoolExecutor
import numpy as np
data = 0 # supposed to be a large data set & shared among all calculations)
num_workers = 6 # number of CPU cores
num_iterations = 10 # supposed to be large number
def f_sum(args):
(x,y) = args
print('This is process', x, 'with exponent:', y)
value = 0
for i in range(10**y):
value += i
return value/10**y + data
def multiprocessing(func, args, workers):
with ProcessPoolExecutor(workers) as executor:
results = executor.map(func, args)
return list(results)
if __name__ == '__main__':
data = 0.5 # try to update data, should not be part of 'args' due to memory
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,8)))
result = multiprocessing(f_sum, args, num_workers)
if np.abs(result[0]-np.round(result[0])) > 0:
print('data NOT updated')
编辑原始问题:
>> 性能示例 1
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import time
data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
data = np.random.randint(0,100,size=data_size)
# data = np.linspace(0,data_size,data_size+1, dtype=np.uintc)
def f_sum(args):
(x,y) = args
print('This is process', x, 'random number:', y, 'last data', data[-1])
value = 0
for i in range(num_sum):
value += i
result = value - num_sum*(num_sum-1)/2 + data[-1]
return result
def multiprocessing(func, args, workers):
with ProcessPoolExecutor(workers) as executor:
results = executor.map(func, args)
return list(results)
if __name__ == '__main__':
t0 = time.time()
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,10)))
result = multiprocessing(f_sum, args, num_workers)
print(f'expected result: {data[-1]}, actual result: {np.unique(result)}')
t1 = time.time()
print(f'total time: {t1-t0}')
>>输出
This is process 99 random number: 6 last data 9
expected result: 86, actual result: [ 3. 9. 29. 58.]
total time: 11.760863542556763
如果使用 randint 会导致错误的结果。对于 linspace 结果是正确的。
>> 性能示例 2 - 基于答案中的建议
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time
data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
input = np.random.randint(0, 100, size=data_size)
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)
def f_sum(args):
(x,y) = args
print('This is process', x, 'random number:', y, 'last data', data[-1])
value = 0
for i in range(num_sum):
value += i
result = value - num_sum*(num_sum-1)/2 + data[-1]
return result
def init_pool(the_data):
global data
data = the_data
def multiprocessing(func, args, workers, input):
data = Array('i', input, lock=False)
with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
results = list(executor.map(func, args))
return results
if __name__ == '__main__':
t0 = time.time()
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,10)))
result = multiprocessing(f_sum, args, num_workers, input)
print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
t1 = time.time()
print(f'total time: {t1-t0}')
>>输出
This is process 99 random number: 7 last data 29
expected result: 29, actual result: [29.]
total time: 30.8266122341156
@Booboo
我在原来的问题中添加了两个示例,“性能示例 2”基于您的代码。第一个有趣的发现是,如果数据数组是用随机整数初始化的,我的原始代码实际上会给出不正确的结果。我注意到,每个进程本身都会初始化数据数组。由于它是基于随机数的,因此每个进程使用不同的数组进行计算,甚至与 main 不同。因此该用例不适用于此代码,在您的代码中它始终是正确的。
但是,如果使用 linspace,它会起作用,因为每次都会给出相同的结果。对于从文件中读取某些数据的用例(这是我的实际用例)也是如此。示例 1 仍然比示例 2 快 3 倍,我认为时间主要用于您方法中数组的初始化。
关于内存使用,我在我的任务管理器中没有发现相关差异。即使形状不同,这两个示例都会产生类似的内存增加。
我仍然相信你的方法是正确的方法,但是,在上面的例子中内存使用似乎相似并且速度较慢。
最有效的内存使用方式是使用共享内存,以便所有进程都在 data
的同一个实例上工作。如果进程更新 data
,这将是绝对必要的。在下面的示例中,由于对 data
的访问是只读的,并且我使用的是一个简单的整数数组,因此我使用 multiprocessing.Array
且未指定锁定。 “技巧”是通过指定 initializer
和 initargs
参数来初始化池,以便池中的每个进程都可以访问此共享内存。我对代码做了一些其他更改,我已经评论了
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array, cpu_count # new imports
def init_pool(the_data):
global data
data = the_data
def f_sum(args):
(x,y) = args
print('This is process', x, 'with exponent:', y)
value = 0
for i in range(10**y):
value += i
return value/10**y + len(data) # just use the length of data for now
def multiprocessing(func, args, workers):
data = Array('i', range(1000), lock=False) # read-only, integers 0, 1, 2, ... 999
with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
results = list(executor.map(func, args)) # create the list of results here
print(results) # so that it can be printed out for demo purposes
return results
if __name__ == '__main__':
num_iterations = 10 # supposed to be large number
#num_workers = 6 # number of CPU cores
num_workers = cpu_count() # number of CPU cores
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,8)))
result = multiprocessing(f_sum, args, num_workers)
if np.abs(result[0]-np.round(result[0])) > 0:
print('data NOT updated')
打印:
This is process 0 with exponent: 2
This is process 1 with exponent: 1
This is process 2 with exponent: 4
This is process 3 with exponent: 3
This is process 4 with exponent: 5
This is process 5 with exponent: 1
This is process 6 with exponent: 5
This is process 7 with exponent: 2
This is process 8 with exponent: 6
This is process 9 with exponent: 6
[1049.5, 1004.5, 5999.5, 1499.5, 50999.5, 1004.5, 50999.5, 1049.5, 500999.5, 500999.5]
data NOT updated
更新示例 2
你看到了我对你关于示例 1 的问题的评论。
您的示例 2 仍然不理想:您将语句 input = np.random.randint(0, 100, size=data_size)
作为每个进程不必要地执行的全局语句,因为它被初始化以用于进程池。下面是一个更新的解决方案,它还展示了一种方法,可以让您的工作函数直接与备份 multiprocessing.Array
实例的 numpy
数组一起工作,以便 numpy
数组存在于共享中记忆。您不必将此技术用于您正在做的事情,因为您仅使用 numpy
来创建随机数(我不确定为什么),但这是一种有用的技术。但是你应该在移动 input
的初始化代码后重新运行你的代码,因为它只执行一次。
我没有机会每天使用 numpy
,但我了解到它在内部使用多处理来实现其自身的许多功能。因此,它通常不是与多处理一起使用的最佳匹配,尽管这似乎不适用于此处,因为即使在下面的情况下,我们也只是索引数组的一个元素,它不会使用子进程来完成它.
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time
import ctypes
data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)
def to_shared_array(arr, ctype):
shared_array = Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def f_sum(args):
(x,y) = args
print('This is process', x, 'random number:', y, 'last data', data[-1])
value = 0
for i in range(num_sum):
value += i
result = value - num_sum*(num_sum-1)/2 + data[-1]
return result
def init_pool(shared_array, shape):
global data
data = to_numpy_array(shared_array, shape)
def multiprocessing(func, args, workers, input):
input = np.random.randint(0, 100, size=data_size)
shape = input.shape
shared_array = to_shared_array(input, ctypes.c_long)
with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(shared_array, shape)) as executor:
results = list(executor.map(func, args))
return input, results
if __name__ == '__main__':
t0 = time.time()
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,10)))
input, result = multiprocessing(f_sum, args, num_workers, input)
print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
t1 = time.time()
print(f'total time: {t1-t0}')