共享数据集和多个参数上的 ProcessPoolExecutor

ProcessPoolExecutor on shared dataset and multiple arguments

我遇到了一个问题,我无法通过在网络上进行一些搜索来解决。

我正在使用下面的最少代码。目标是通过多处理(使用 ProcessPoolExecutor)运行 某些函数 'f_sum' 几百万次。我通过元组列表 'args' 添加多个参数。此外,该函数应该使用某种对所有执行都相同的数据(在示例中它只是一个数字)。出于内存原因,我不想将数据添加到 'args' 元组。

目前我发现的唯一选择是在“if name == 'main'”之外添加数据。这将使(出于某种我不明白的原因)变量可用于所有进程。但是,更新是不可能的。另外,我真的不想在外部定义数据,因为在实际代码中它将基于数据导入,可能需要额外的操作。

希望您能提供帮助并提前致谢!

PS:我在 Win 10 上使用 Python 3.7.9。

from concurrent.futures import ProcessPoolExecutor
import numpy as np

data = 0 # supposed to be a large data set & shared among all calculations)
num_workers = 6  # number of CPU cores
num_iterations = 10  # supposed to be large number


def f_sum(args):
    (x,y) = args
    print('This is process', x, 'with exponent:', y)
    value = 0
    for i in range(10**y):
        value += i
    return value/10**y + data


def multiprocessing(func, args, workers):
    with ProcessPoolExecutor(workers) as executor:
        results = executor.map(func, args)
    return list(results)


if __name__ == '__main__':
    data = 0.5  # try to update data, should not be part of 'args' due to memory

    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,8)))

    result = multiprocessing(f_sum, args, num_workers)

    if np.abs(result[0]-np.round(result[0])) > 0:
        print('data NOT updated')

编辑原始问题:

>> 性能示例 1

from concurrent.futures import ProcessPoolExecutor
import numpy as np
import time

data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
data = np.random.randint(0,100,size=data_size)
# data = np.linspace(0,data_size,data_size+1, dtype=np.uintc)

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'random number:', y, 'last data', data[-1])
    value = 0
    for i in range(num_sum):
        value += i
    result = value - num_sum*(num_sum-1)/2 + data[-1]
    return result

def multiprocessing(func, args, workers):
    with ProcessPoolExecutor(workers) as executor:
        results = executor.map(func, args)
    return list(results)

if __name__ == '__main__':
    t0 = time.time()

    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,10)))

    result = multiprocessing(f_sum, args, num_workers)

    print(f'expected result: {data[-1]}, actual result: {np.unique(result)}')
    t1 = time.time()
    print(f'total time: {t1-t0}')

>>输出

This is process 99 random number: 6 last data 9
expected result: 86, actual result: [ 3.  9. 29. 58.]
total time: 11.760863542556763

如果使用 randint 会导致错误的结果。对于 linspace 结果是正确的。

>> 性能示例 2 - 基于答案中的建议

from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time

data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
input = np.random.randint(0, 100, size=data_size)
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'random number:', y, 'last data', data[-1])
    value = 0
    for i in range(num_sum):
        value += i
    result = value - num_sum*(num_sum-1)/2 + data[-1]
    return result

def init_pool(the_data):
    global data
    data = the_data

def multiprocessing(func, args, workers, input):
    data = Array('i', input, lock=False)
    with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
        results = list(executor.map(func, args))
    return results

if __name__ == '__main__':
    t0 = time.time()
    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,10)))

    result = multiprocessing(f_sum, args, num_workers, input)

    print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
    t1 = time.time()
    print(f'total time: {t1-t0}')

>>输出

This is process 99 random number: 7 last data 29
expected result: 29, actual result: [29.]
total time: 30.8266122341156

@Booboo

我在原来的问题中添加了两个示例,“性能示例 2”基于您的代码。第一个有趣的发现是,如果数据数组是用随机整数初始化的,我的原始代码实际上会给出不正确的结果。我注意到,每个进程本身都会初始化数据数组。由于它是基于随机数的,因此每个进程使用不同的数组进行计算,甚至与 main 不同。因此该用例不适用于此代码,在您的代码中它始终是正确的。

但是,如果使用 linspace,它会起作用,因为每次都会给出相同的结果。对于从文件中读取某些数据的用例(这是我的实际用例)也是如此。示例 1 仍然比示例 2 快 3 倍,我认为时间主要用于您方法中数组的初始化。

关于内存使用,我在我的任务管理器中没有发现相关差异。即使形状不同,这两个示例都会产生类似的内存增加。

我仍然相信你的方法是正确的方法,但是,在上面的例子中内存使用似乎相似并且速度较慢。

最有效的内存使用方式是使用共享内存,以便所有进程都在 data 的同一个实例上工作。如果进程更新 data,这将是绝对必要的。在下面的示例中,由于对 data 的访问是只读的,并且我使用的是一个简单的整数数组,因此我使用 multiprocessing.Array 且未指定锁定。 “技巧”是通过指定 initializerinitargs 参数来初始化池,以便池中的每个进程都可以访问此共享内存。我对代码做了一些其他更改,我已经评论了

from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array, cpu_count # new imports


def init_pool(the_data):
    global data
    data = the_data

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'with exponent:', y)
    value = 0
    for i in range(10**y):
        value += i
    return value/10**y + len(data) # just use the length of data for now

def multiprocessing(func, args, workers):
    data = Array('i', range(1000), lock=False) # read-only, integers 0, 1, 2, ... 999
    with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
        results = list(executor.map(func, args)) # create the list of results here
    print(results) # so that it can be printed out for demo purposes
    return results


if __name__ == '__main__':
    num_iterations = 10  # supposed to be large number
    #num_workers = 6  # number of CPU cores
    num_workers = cpu_count()  # number of CPU cores

    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,8)))

    result = multiprocessing(f_sum, args, num_workers)

    if np.abs(result[0]-np.round(result[0])) > 0:
        print('data NOT updated')

打印:

This is process 0 with exponent: 2
This is process 1 with exponent: 1
This is process 2 with exponent: 4
This is process 3 with exponent: 3
This is process 4 with exponent: 5
This is process 5 with exponent: 1
This is process 6 with exponent: 5
This is process 7 with exponent: 2
This is process 8 with exponent: 6
This is process 9 with exponent: 6
[1049.5, 1004.5, 5999.5, 1499.5, 50999.5, 1004.5, 50999.5, 1049.5, 500999.5, 500999.5]
data NOT updated

更新示例 2

你看到了我对你关于示例 1 的问题的评论。

您的示例 2 仍然不理想:您将语句 input = np.random.randint(0, 100, size=data_size) 作为每个进程不必要地执行的全局语句,因为它被初始化以用于进程池。下面是一个更新的解决方案,它还展示了一种方法,可以让您的工作函数直接与备份 multiprocessing.Array 实例的 numpy 数组一起工作,以便 numpy 数组存在于共享中记忆。您不必将此技术用于您正在做的事情,因为您仅使用 numpy 来创建随机数(我不确定为什么),但这是一种有用的技术。但是你应该在移动 input 的初始化代码后重新运行你的代码,因为它只执行一次。

我没有机会每天使用 numpy,但我了解到它在内部使用多处理来实现其自身的许多功能。因此,它通常不是与多处理一起使用的最佳匹配,尽管这似乎不适用于此处,因为即使在下面的情况下,我们也只是索引数组的一个元素,它不会使用子进程来完成它.

from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time
import ctypes

data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)


def to_shared_array(arr, ctype):
    shared_array = Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'random number:', y, 'last data', data[-1])
    value = 0
    for i in range(num_sum):
        value += i
    result = value - num_sum*(num_sum-1)/2 + data[-1]
    return result

def init_pool(shared_array, shape):
    global data
    data = to_numpy_array(shared_array, shape)

def multiprocessing(func, args, workers, input):
    input = np.random.randint(0, 100, size=data_size)
    shape = input.shape
    shared_array = to_shared_array(input, ctypes.c_long)
    with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(shared_array, shape)) as executor:
        results = list(executor.map(func, args))
    return input, results

if __name__ == '__main__':
    t0 = time.time()
    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,10)))

    input, result = multiprocessing(f_sum, args, num_workers, input)

    print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
    t1 = time.time()
    print(f'total time: {t1-t0}')