Python 多处理(将数据拆分成更小的块 - 多个函数参数)

Python Multiprocessing (Splitting data in smaller chunks - multiple function arguments)


来自 22.02.21 的注释: -我的问题也可以通过更有效的内存使用而不是多处理来解决,因为我意识到内存负载变得非常高并且可能是这里的一个限制因素。


我正在尝试通过使用 multiprocessing 来减少我的脚本 运行 所需的时间。 过去我得到了一些关于提高函数本身速度的好技巧 (),但现在我想利用 32 核的所有内核工作站。 我的函数将两个列表(X 和 Y)的条目与引用列表 Q 和 Z 进行比较。对于 X/Y 中的每个元素,它检查 X[i] 是否出现在 Q 中的某处以及 Y[i] 是否出现在 Z 中.如果X[i] == Q[s] AND Y[i] == Z[s],它returns索引“s”。 (注意:我的真实数据由 DNA 测序读数组成,我需要将我的读数映射到参考。)

到目前为止我尝试了什么:

我的问题:

有谁知道如何解决这个问题,或者可以建议在我的代码中使用多处理的更好方法?

代码如下:

import numpy as np
import multiprocessing
import concurrent.futures

np.random.seed(1)

def matchdictfunc(index,x,y,q,z):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    lookup = {}
    for i, (q, z) in enumerate(zip(Q, Z)):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match
    
    return matchdict

def split(a, n):  # function to split list in n even parts
    k, m = divmod(len(a), n)
    return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))

def splitinput(index,X,Y,Q,Z):  # split large lists X and Y in n-even parts (n = cpu_count), make new list containing n-times Q and Z (to feed Q and Z for every process)
    cpu_count = multiprocessing.cpu_count()

    #create multiple chunks for X and Y and index:
    index_split = split(index,cpu_count)
    X_split = split(X,cpu_count)
    Y_split = split(Y,cpu_count)

    # create list with several times Q and Z since it needs to be same length as X_split etc:
    Q_mult = []  
    Z_mult = []
    for _ in range(cpu_count):
        Q_mult.append(Q)
        Z_mult.append(Z)
    return index_split,X_split,Y_split,Q_mult,Z_mult

# N will finally scale up to 10^9
N = 10000000
M = 300

index = [str(x) for x in list(range(N))]
X = np.random.randint(M, size=N)
Y = np.random.randint(M, size=N)

# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)

# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
X = np.char.mod('%d', X).tolist()
Y = np.char.mod('%d', Y).tolist()
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()

# single-core:
matchdict = matchdictfunc(index,X,Y,Q,Z)

# split lists to number of processors (cpu_count)
index_split,X_split,Y_split,Q_mult,Z_mult = splitinput(index,X,Y,Q,Z)  

## Multiprocessing attempt - FAILS! (index out of range)
# finallist = []
# if __name__ == '__main__':
#     with concurrent.futures.ProcessPoolExecutor() as executor:
#         results = executor.map(matchlistfunc,X_split,Y_split,Q_mult,Z_mult)
#         for result in results:
#             finallist.append(result)
    
#         matchdict = {}
#         for d in finallist:
#             matchdict.update(d)

你的函数matchdictfunc目前有参数xyqz但实际上并没有使用它们,尽管在多处理中version 它将需要使用两个参数。函数 splitinput 也不需要将 QZ 复制到 returned 值 Q_splitZ_split 中。目前,matchdictfunc 期望 QZ 成为全局变量,我们可以通过使用 initializerinitargs 构造池时的参数。您还应该将不需要由子进程执行的代码移动到由 if __name__ == '__main__': 控制的块中,例如 arary 初始化代码。这些更改导致:

import numpy as np
import multiprocessing
import concurrent.futures

MULTIPROCESSING = True

def init_pool(q, z):
    global Q, Z
    Q = q
    Z = z

def matchdictfunc(index, X, Y):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    lookup = {}
    for i, (q, z) in enumerate(zip(Q, Z)):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match

    return matchdict

def split(a, n):  # function to split list in n even parts
    k, m = divmod(len(a), n)
    return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))

def splitinput(index, X, Y):  # split large lists X and Y in n-even parts (n = cpu_count))
    cpu_count = multiprocessing.cpu_count()

    #create multiple chunks for X and Y and index:
    index_split = split(index,cpu_count)
    X_split = split(X,cpu_count)
    Y_split = split(Y,cpu_count)
    return index_split, X_split ,Y_split


def main():
    # following required for non-multiprocessing
    if not MULTIPROCESSING:
        global Q, Z

    np.random.seed(1)

    # N will finally scale up to 10^9
    N = 10000000
    M = 300

    index = [str(x) for x in list(range(N))]
    X = np.random.randint(M, size=N)
    Y = np.random.randint(M, size=N)

    # Q and Z size is fixed at 120000
    Q = np.random.randint(M, size=120000)
    Z = np.random.randint(M, size=120000)

    # convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
    X = np.char.mod('%d', X).tolist()
    Y = np.char.mod('%d', Y).tolist()
    Q = np.char.mod('%d', Q).tolist()
    Z = np.char.mod('%d', Z).tolist()

    # for non-multiprocessing:
    if not MULTIPROCESSING:
        matchdict = matchdictfunc(index, X, Y)
    else:
        # for multiprocessing:
        # split lists to number of processors (cpu_count)
        index_split, X_split, Y_split = splitinput(index, X, Y)
        with concurrent.futures.ProcessPoolExecutor(initializer=init_pool, initargs=(Q, Z)) as executor:
            finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
            matchdict = {}
            for d in finallist:
                matchdict.update(d)

    #print(matchdict)

if __name__ == '__main__':
    main()

注意:我尝试使用 N = 1000 的较小值(打印出 matchdict 的结果),多处理版本似乎 return 相同的结果。我的机器没有足够的资源来 运行 并具有 N 的完整值而不冻结其他所有内容。

另一种方法

我假设您的 DNA 数据是外部数据并且 XY 值可以一次读取 n 个值 或者可以被读入和写出,以便这是可能的。 然后,与其将所有数据驻留在内存中并将其分成 32 个部分,我建议一次读取 n 个值并因此分解成大约 N/n 块。

在下面的代码中,我已经切换到使用 class multiprocessing.pool.Pool 中的 imap 方法。优点是lazily提交任务到进程池,即iterable参数不必是列表或可转换为列表。相反,池将迭代 iterable 将任务发送到 chunksize 组中的池。在下面的代码中,我为 imap 的参数使用了一个生成器函数,它将生成连续的 XY 值。您实际的生成器函数将首先打开 DNA 文件(或多个文件)并读取文件的连续部分。

import numpy as np
import multiprocessing

def init_pool(q, z):
    global Q, Z
    Q = q
    Z = z

def matchdictfunc(t):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    index, X, Y = t
    lookup = {}
    for i, (q, z) in enumerate(zip(Q, Z)):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match

    return matchdict


def next_tuple(n, stop, M):
    start = 0
    while True:
        end = min(start + n, stop)
        index = [str(x) for x in list(range(start, end))]
        x = np.random.randint(M, size=n)
        y = np.random.randint(M, size=n)
        # convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
        x = np.char.mod('%d', x).tolist()
        y = np.char.mod('%d', y).tolist()
        yield (index, x, y)
        start = end
        if start >= stop:
            break

def compute_chunksize(XY_AT_A_TIME, N):
    n_tasks, remainder = divmod(N, XY_AT_A_TIME)
    if remainder:
        n_tasks += 1
    chunksize, remainder = divmod(n_tasks, multiprocessing.cpu_count() * 4)
    if remainder:
        chunksize += 1
    return chunksize


def main():
    np.random.seed(1)

    # N will finally scale up to 10^9
    N = 10000000
    M = 300

    # Q and Z size is fixed at 120000
    Q = np.random.randint(M, size=120000)
    Z = np.random.randint(M, size=120000)

    # convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
    Q = np.char.mod('%d', Q).tolist()
    Z = np.char.mod('%d', Z).tolist()

    matchdict = {}
    # number of X, Y pairs at a time:
    # experiment with this, especially as N increases:
    XY_AT_A_TIME = 10000
    chunksize = compute_chunksize(XY_AT_A_TIME, N)
    #print('chunksize =', chunksize) # 32 with 8 cores
    with multiprocessing.Pool(initializer=init_pool, initargs=(Q, Z)) as pool:
        for d in pool.imap(matchdictfunc, next_tuple(XY_AT_A_TIME, N, M), chunksize):
            matchdict.update(d)
    #print(matchdict)

if __name__ == '__main__':
    import time
    t = time.time()
    main()
    print('total time =', time.time() - t)

更新

我想从基准中删除使用 numpy。众所周知,numpy 对其某些操作使用多处理,并且在多处理应用程序中使用时可能会导致性能下降。所以我做的第一件事就是把OP的原始程序和代码所在的位置,例如:

import numpy as np

np.random.seed(1)
X = np.random.randint(M, size=N)
X = np.char.mod('%d', X).tolist()

我将其替换为:

import random

random.seed(1)
X = [str(random.randrange(M)) for _ in range(N)]

然后我对 OP 的程序进行计时,以获取生成 XYQZ 列表的时间以及总时间。在我的桌面上,时间分别约为 20 秒和 37 秒!所以在我的多处理版本中,仅仅为进程池的进程生成参数就超过了总 运行ning 时间的一半。对于第二种方法,我还发现,当我增加 XY_AT_A_TIME 的值时,CPU 利用率从 100% 下降到 50% 左右,但总运行时间有所改善。我还没有完全弄清楚这是为什么。

接下来,我尝试模拟程序在读取数据时将如何运行。所以我将 2 * N 个随机整数写到一个文件中,temp.txt 并修改了 OP 的程序从文件中初始化 XY,然后修改我的第二种方法的 next_tuple 函数如下:

def next_tuple(n, stop, M):
    with open('temp.txt') as f:
        start = 0
        while True:
            end = min(start + n, stop)
            index = [str(x) for x in range(start, end)] # improvement
            x = [f.readline().strip() for _ in range(n)]
            y = [f.readline().strip() for _ in range(n)]
            yield (index, x, y)
            start = end
            if start >= stop:
                break

随着我增加 XY_AT_A_TIME,CPU 利用率再次下降(我发现的最佳性能值为 400000,CPU 利用率仅为 40% 左右)。

我终于重写了我的第一个方法,试图提高内存效率(见下文)。此更新版本再次从文件中读取随机数,但使用 XYindex 的生成器函数,因此我不需要内存来存储完整列表和拆分。同样,我不希望多处理和非多处理版本出现重复结果,因为我在这两种情况下分配 XY 值的方式(一个简单的解决方案是将随机数写入 X 值文件和 Y 值文件,然后从这两个文件中读回值)。但这对 运行ning 次没有影响。但是,尽管使用默认池大小 8,CPU 利用率仍然只有 30-40%(波动很大),总体 运行ning 时间几乎是非多处理的两倍运行宁时间。但是为什么?

import random
import multiprocessing
import concurrent.futures
import time

MULTIPROCESSING = True

POOL_SIZE = multiprocessing.cpu_count()

def init_pool(q, z):
    global Q, Z
    Q = q
    Z = z

def matchdictfunc(index, X, Y):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    lookup = {}
    for i, (q, z) in enumerate(zip(Q, Z)):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match

    return matchdict

def split(a):  # function to split list in POOL_SIZE even parts
    k, m = divmod(N, POOL_SIZE)
    divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
    parts = []
    for division in divisions:
        part = [next(a) for _ in range(division)]
        parts.append(part)
    return parts

def splitinput(index, X, Y):  # split large lists X and Y in n-even parts (n = POOL_SIZE)
    #create multiple chunks for X and Y and index:
    index_split = split(index)
    X_split = split(X)
    Y_split = split(Y)
    return index_split, X_split ,Y_split


def main():
    global N

    # following required for non-multiprocessing
    if not MULTIPROCESSING:
        global Q, Z

    random.seed(1)

    # N will finally scale up to 10^9
    N = 10000000
    M = 300

    # Q and Z size is fixed at 120000
    Q = [str(random.randrange(M)) for _ in range(120000)]
    Z = [str(random.randrange(M)) for _ in range(120000)]

    with open('temp.txt') as f:
        # for non-multiprocessing:
        if not MULTIPROCESSING:
            index = [str(x) for x in range(N)]
            X = [f.readline().strip() for _ in range(N)]
            Y = [f.readline().strip() for _ in range(N)]
            matchdict = matchdictfunc(index, X, Y)
        else:
            # for multiprocessing:
            # split lists to number of processors (POOL_SIZE)
            # generator functions:
            index = (str(x) for x in range(N))
            X = (f.readline().strip() for _ in range(N))
            Y = (f.readline().strip() for _ in range(N))
            index_split, X_split, Y_split = splitinput(index, X, Y)
            with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q, Z)) as executor:
                finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
                matchdict = {}
                for d in finallist:
                    matchdict.update(d)


if __name__ == '__main__':
    t = time.time()
    main()
    print('total time =', time.time() - t)

分辨率?

会不会是将数据从主进程传输到子进程的开销(涉及共享内存读写)导致一切变慢?因此,这个最终版本试图消除这种放缓的潜在原因。在我的桌面上,我有 8 个处理器。对于第一种方法,在它们之间划分 N = 10000000 XY 值意味着每个进程应该处理 N // 8 -> 1250000 值。所以我写出了 16 组 1250000 个数字的随机数(X 有 8 组,Y 有 8 组)作为二进制文件,使用以下代码记录这 16 组中每一组的偏移量和长度代码:

import random

random.seed(1)

with open('temp.txt', 'wb') as f:
    offsets = []
    for i in range(16):
        n = [str(random.randrange(300)) for _ in range(1250000)]
        b = ','.join(n).encode('ascii')
        l = len(b)
        offsets.append((f.tell(), l))
        f.write(b)

print(offsets)

然后我构建了列表 X_SPECSY_SPECS 辅助函数 matchdictfunc 可以用来读取值 XY 本身如所须。所以现在不是一次将 1250000 个值传递给这个辅助函数,我们只是将索引 0、1、... 7 传递给辅助函数,这样它就知道它必须读入哪个组。共享内存访问已完全消除在访问 XY 时(QZ 仍然需要它)并且磁盘访问已移至进程池。 CPU 利用率当然不会是 100%,因为 worker 函数正在执行 I/O。但是我发现虽然 运行ning 时间现在有了很大的改进,但它仍然比原来的非多处理版本没有改进:

OP's original program modified to read `X` and `Y` values in from file: 26.2 seconds
Multiprocessing elapsed time: 29.2 seconds

事实上,当我通过将 ProcessPoolExecutor 替换为 ThreadPoolExecutor 来更改代码以使用多线程时,经过的时间几乎又减少了一秒钟,这表明对全局解释器的争用很少锁定在 worker 函数内,即大部分时间都花在 C 语言代码上。主要工作由:

matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]

当我们使用多处理执行此操作时,我们有多个列表理解和多个 zip 操作(在较小的列表上)由单独的进程执行,然后我们 assemble 最后得到结果。这是我的推测,但通过采用已经高效的操作并将它们缩小到多个处理器,可能不会获得任何性能提升。或者换句话说,我被难住了,这是我最好的猜测。

最终版本(有一些额外的优化——请注意):

import random
import concurrent.futures
import time

POOL_SIZE = 8

X_SPECS = [(0, 4541088), (4541088, 4541824), (9082912, 4540691), (13623603, 4541385), (18164988, 4541459), (22706447, 4542961), (27249408, 4541847), (31791255, 4542186)]
Y_SPECS = [(36333441, 4542101), (40875542, 4540120), (45415662, 4540802), (49956464, 4540971), (54497435, 4541427), (59038862, 4541523), (63580385, 4541571), (68121956, 4542335)]

def init_pool(q_z):
    global Q_Z
    Q_Z = q_z

def matchdictfunc(index, i):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    x_offset, x_len = X_SPECS[i]
    y_offset, y_len = Y_SPECS[i]
    with open('temp.txt', 'rb') as f:
        f.seek(x_offset, 0)
        X = f.read(x_len).decode('ascii').split(',')
        f.seek(y_offset, 0)
        Y = f.read(y_len).decode('ascii').split(',')

    lookup = {}
    for i, (q, z) in enumerate(Q_Z):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match

    return matchdict

def split(a):  # function to split list in POOL_SIZE even parts
    k, m = divmod(N, POOL_SIZE)
    divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
    parts = []
    for division in divisions:
        part = [next(a) for _ in range(division)]
        parts.append(part)
    return parts



def main():
    global N

    random.seed(1)

    # N will finally scale up to 10^9
    N = 10000000
    M = 300

    # Q and Z size is fixed at 120000
    Q = (str(random.randrange(M)) for _ in range(120000))
    Z = (str(random.randrange(M)) for _ in range(120000))
    Q_Z = list(zip(Q, Z)) # pre-compute the `zip` function

    # for multiprocessing:
    # split lists to number of processors (POOL_SIZE)
    # generator functions:
    index = (str(x) for x in range(N))
    index_split = split(index)
    with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q_Z,)) as executor:
        finallist = executor.map(matchdictfunc, index_split, range(8))
        matchdict = {}
        for d in finallist:
            matchdict.update(d)

    print(len(matchdict))

if __name__ == '__main__':
    t = time.time()
    main()
    print('total time =', time.time() - t)

进程间内存传输的开销

在下面的代码中,函数 create_files 被调用以创建 100 个相同的文件,这些文件由 1,000,000 个数字的“腌制”列表组成。然后,我两次使用大小为 8 的多处理池来读取 100 个文件并解开这些文件以重建原始列表。第一种情况 (worker1) 和第二种情况 (worker2) 之间的区别在于,在第二种情况下,列表被 return 返回给调用者(但不保存以便内存可以立即被垃圾收集)。第二个案例比第一个案例花费的时间长三倍多。这也可以部分解释为什么在切换到多处理时看不到加速。

from multiprocessing import Pool
import pickle
import time

def create_files():
    l = [i for i in range(1000000)]
    # create 100 identical files:
    for file in range(1, 101):
        with open(f'pkl/test{file}.pkl', 'wb') as f:
            pickle.dump(l, f)


def worker1(file):
    file_name = f'pkl/test{file}.pkl'
    with open(file_name, 'rb') as f:
        obj = pickle.load(f)


def worker2(file):
    file_name = f'pkl/test{file}.pkl'
    with open(file_name, 'rb') as f:
        obj = pickle.load(f)
    return file_name, obj

POOLSIZE = 8

if __name__ == '__main__':
    #create_files()

    pool = Pool(POOLSIZE)
    t = time.time()
    # no data returned:
    for file in range(1, 101):
        pool.apply_async(worker1, args=(file,))
    pool.close()
    pool.join()
    print(time.time() - t)

    pool = Pool(POOLSIZE)
    t = time.time()
    for file in range(1, 101):
        pool.apply_async(worker2, args=(file,))
    pool.close()
    pool.join()
    print(time.time() - t)

    t = time.time()
    for file in range(1, 101):
        worker2(file)
    print(time.time() - t)