Python 多处理(将数据拆分成更小的块 - 多个函数参数)
Python Multiprocessing (Splitting data in smaller chunks - multiple function arguments)
来自 22.02.21 的注释:
-我的问题也可以通过更有效的内存使用而不是多处理来解决,因为我意识到内存负载变得非常高并且可能是这里的一个限制因素。
我正在尝试通过使用 multiprocessing 来减少我的脚本 运行 所需的时间。
过去我得到了一些关于提高函数本身速度的好技巧 (),但现在我想利用 32 核的所有内核工作站。
我的函数将两个列表(X 和 Y)的条目与引用列表 Q 和 Z 进行比较。对于 X/Y 中的每个元素,它检查 X[i] 是否出现在 Q 中的某处以及 Y[i] 是否出现在 Z 中.如果X[i] == Q[s] AND Y[i] == Z[s],它returns索引“s”。
(注意:我的真实数据由 DNA 测序读数组成,我需要将我的读数映射到参考。)
到目前为止我尝试了什么:
- 拆分我的长列表 X 和 Y 为 偶数块(n 块,其中 n == cpu_count)
- 并行尝试“concurrent.futures.ProcessPoolExecutor()”到运行每个“子列表”的函数,最后合并每个子列表的结果处理到一个最终的字典(matchdict)。 (--> 见注释掉的部分)
我的问题:
- 当我取消对多处理部分的注释时,所有内核都在使用,但它最终出现一个我无法解决的错误(索引超出范围)。 (--> 提示:将 N 降低到 1000,您将立即看到错误,而无需永远等待)
有谁知道如何解决这个问题,或者可以建议在我的代码中使用多处理的更好方法?
代码如下:
import numpy as np
import multiprocessing
import concurrent.futures
np.random.seed(1)
def matchdictfunc(index,x,y,q,z): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a, n): # function to split list in n even parts
k, m = divmod(len(a), n)
return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))
def splitinput(index,X,Y,Q,Z): # split large lists X and Y in n-even parts (n = cpu_count), make new list containing n-times Q and Z (to feed Q and Z for every process)
cpu_count = multiprocessing.cpu_count()
#create multiple chunks for X and Y and index:
index_split = split(index,cpu_count)
X_split = split(X,cpu_count)
Y_split = split(Y,cpu_count)
# create list with several times Q and Z since it needs to be same length as X_split etc:
Q_mult = []
Z_mult = []
for _ in range(cpu_count):
Q_mult.append(Q)
Z_mult.append(Z)
return index_split,X_split,Y_split,Q_mult,Z_mult
# N will finally scale up to 10^9
N = 10000000
M = 300
index = [str(x) for x in list(range(N))]
X = np.random.randint(M, size=N)
Y = np.random.randint(M, size=N)
# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
X = np.char.mod('%d', X).tolist()
Y = np.char.mod('%d', Y).tolist()
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()
# single-core:
matchdict = matchdictfunc(index,X,Y,Q,Z)
# split lists to number of processors (cpu_count)
index_split,X_split,Y_split,Q_mult,Z_mult = splitinput(index,X,Y,Q,Z)
## Multiprocessing attempt - FAILS! (index out of range)
# finallist = []
# if __name__ == '__main__':
# with concurrent.futures.ProcessPoolExecutor() as executor:
# results = executor.map(matchlistfunc,X_split,Y_split,Q_mult,Z_mult)
# for result in results:
# finallist.append(result)
# matchdict = {}
# for d in finallist:
# matchdict.update(d)
你的函数matchdictfunc
目前有参数x
、y
、q
、z
但实际上并没有使用它们,尽管在多处理中version 它将需要使用两个参数。函数 splitinput
也不需要将 Q
和 Z
复制到 returned 值 Q_split
和 Z_split
中。目前,matchdictfunc
期望 Q
和 Z
成为全局变量,我们可以通过使用 initializer
和 initargs
构造池时的参数。您还应该将不需要由子进程执行的代码移动到由 if __name__ == '__main__':
控制的块中,例如 arary 初始化代码。这些更改导致:
import numpy as np
import multiprocessing
import concurrent.futures
MULTIPROCESSING = True
def init_pool(q, z):
global Q, Z
Q = q
Z = z
def matchdictfunc(index, X, Y): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a, n): # function to split list in n even parts
k, m = divmod(len(a), n)
return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))
def splitinput(index, X, Y): # split large lists X and Y in n-even parts (n = cpu_count))
cpu_count = multiprocessing.cpu_count()
#create multiple chunks for X and Y and index:
index_split = split(index,cpu_count)
X_split = split(X,cpu_count)
Y_split = split(Y,cpu_count)
return index_split, X_split ,Y_split
def main():
# following required for non-multiprocessing
if not MULTIPROCESSING:
global Q, Z
np.random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
index = [str(x) for x in list(range(N))]
X = np.random.randint(M, size=N)
Y = np.random.randint(M, size=N)
# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
X = np.char.mod('%d', X).tolist()
Y = np.char.mod('%d', Y).tolist()
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()
# for non-multiprocessing:
if not MULTIPROCESSING:
matchdict = matchdictfunc(index, X, Y)
else:
# for multiprocessing:
# split lists to number of processors (cpu_count)
index_split, X_split, Y_split = splitinput(index, X, Y)
with concurrent.futures.ProcessPoolExecutor(initializer=init_pool, initargs=(Q, Z)) as executor:
finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
matchdict = {}
for d in finallist:
matchdict.update(d)
#print(matchdict)
if __name__ == '__main__':
main()
注意:我尝试使用 N
= 1000 的较小值(打印出 matchdict
的结果),多处理版本似乎 return 相同的结果。我的机器没有足够的资源来 运行 并具有 N
的完整值而不冻结其他所有内容。
另一种方法
我假设您的 DNA 数据是外部数据并且 X
和 Y
值可以一次读取 n
个值 或者可以被读入和写出,以便这是可能的。 然后,与其将所有数据驻留在内存中并将其分成 32 个部分,我建议一次读取 n
个值并因此分解成大约 N/n
块。
在下面的代码中,我已经切换到使用 class multiprocessing.pool.Pool
中的 imap
方法。优点是lazily提交任务到进程池,即iterable
参数不必是列表或可转换为列表。相反,池将迭代 iterable 将任务发送到 chunksize
组中的池。在下面的代码中,我为 imap
的参数使用了一个生成器函数,它将生成连续的 X
和 Y
值。您实际的生成器函数将首先打开 DNA 文件(或多个文件)并读取文件的连续部分。
import numpy as np
import multiprocessing
def init_pool(q, z):
global Q, Z
Q = q
Z = z
def matchdictfunc(t): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
index, X, Y = t
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def next_tuple(n, stop, M):
start = 0
while True:
end = min(start + n, stop)
index = [str(x) for x in list(range(start, end))]
x = np.random.randint(M, size=n)
y = np.random.randint(M, size=n)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
x = np.char.mod('%d', x).tolist()
y = np.char.mod('%d', y).tolist()
yield (index, x, y)
start = end
if start >= stop:
break
def compute_chunksize(XY_AT_A_TIME, N):
n_tasks, remainder = divmod(N, XY_AT_A_TIME)
if remainder:
n_tasks += 1
chunksize, remainder = divmod(n_tasks, multiprocessing.cpu_count() * 4)
if remainder:
chunksize += 1
return chunksize
def main():
np.random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()
matchdict = {}
# number of X, Y pairs at a time:
# experiment with this, especially as N increases:
XY_AT_A_TIME = 10000
chunksize = compute_chunksize(XY_AT_A_TIME, N)
#print('chunksize =', chunksize) # 32 with 8 cores
with multiprocessing.Pool(initializer=init_pool, initargs=(Q, Z)) as pool:
for d in pool.imap(matchdictfunc, next_tuple(XY_AT_A_TIME, N, M), chunksize):
matchdict.update(d)
#print(matchdict)
if __name__ == '__main__':
import time
t = time.time()
main()
print('total time =', time.time() - t)
更新
我想从基准中删除使用 numpy
。众所周知,numpy
对其某些操作使用多处理,并且在多处理应用程序中使用时可能会导致性能下降。所以我做的第一件事就是把OP的原始程序和代码所在的位置,例如:
import numpy as np
np.random.seed(1)
X = np.random.randint(M, size=N)
X = np.char.mod('%d', X).tolist()
我将其替换为:
import random
random.seed(1)
X = [str(random.randrange(M)) for _ in range(N)]
然后我对 OP 的程序进行计时,以获取生成 X
、Y
、Q
和 Z
列表的时间以及总时间。在我的桌面上,时间分别约为 20 秒和 37 秒!所以在我的多处理版本中,仅仅为进程池的进程生成参数就超过了总 运行ning 时间的一半。对于第二种方法,我还发现,当我增加 XY_AT_A_TIME
的值时,CPU 利用率从 100% 下降到 50% 左右,但总运行时间有所改善。我还没有完全弄清楚这是为什么。
接下来,我尝试模拟程序在读取数据时将如何运行。所以我将 2 * N
个随机整数写到一个文件中,temp.txt
并修改了 OP 的程序从文件中初始化 X
和 Y
,然后修改我的第二种方法的 next_tuple
函数如下:
def next_tuple(n, stop, M):
with open('temp.txt') as f:
start = 0
while True:
end = min(start + n, stop)
index = [str(x) for x in range(start, end)] # improvement
x = [f.readline().strip() for _ in range(n)]
y = [f.readline().strip() for _ in range(n)]
yield (index, x, y)
start = end
if start >= stop:
break
随着我增加 XY_AT_A_TIME
,CPU 利用率再次下降(我发现的最佳性能值为 400000,CPU 利用率仅为 40% 左右)。
我终于重写了我的第一个方法,试图提高内存效率(见下文)。此更新版本再次从文件中读取随机数,但使用 X
、Y
和 index
的生成器函数,因此我不需要内存来存储完整列表和拆分。同样,我不希望多处理和非多处理版本出现重复结果,因为我在这两种情况下分配 X
和 Y
值的方式(一个简单的解决方案是将随机数写入 X
值文件和 Y
值文件,然后从这两个文件中读回值)。但这对 运行ning 次没有影响。但是,尽管使用默认池大小 8,CPU 利用率仍然只有 30-40%(波动很大),总体 运行ning 时间几乎是非多处理的两倍运行宁时间。但是为什么?
import random
import multiprocessing
import concurrent.futures
import time
MULTIPROCESSING = True
POOL_SIZE = multiprocessing.cpu_count()
def init_pool(q, z):
global Q, Z
Q = q
Z = z
def matchdictfunc(index, X, Y): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a): # function to split list in POOL_SIZE even parts
k, m = divmod(N, POOL_SIZE)
divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
parts = []
for division in divisions:
part = [next(a) for _ in range(division)]
parts.append(part)
return parts
def splitinput(index, X, Y): # split large lists X and Y in n-even parts (n = POOL_SIZE)
#create multiple chunks for X and Y and index:
index_split = split(index)
X_split = split(X)
Y_split = split(Y)
return index_split, X_split ,Y_split
def main():
global N
# following required for non-multiprocessing
if not MULTIPROCESSING:
global Q, Z
random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
# Q and Z size is fixed at 120000
Q = [str(random.randrange(M)) for _ in range(120000)]
Z = [str(random.randrange(M)) for _ in range(120000)]
with open('temp.txt') as f:
# for non-multiprocessing:
if not MULTIPROCESSING:
index = [str(x) for x in range(N)]
X = [f.readline().strip() for _ in range(N)]
Y = [f.readline().strip() for _ in range(N)]
matchdict = matchdictfunc(index, X, Y)
else:
# for multiprocessing:
# split lists to number of processors (POOL_SIZE)
# generator functions:
index = (str(x) for x in range(N))
X = (f.readline().strip() for _ in range(N))
Y = (f.readline().strip() for _ in range(N))
index_split, X_split, Y_split = splitinput(index, X, Y)
with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q, Z)) as executor:
finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
matchdict = {}
for d in finallist:
matchdict.update(d)
if __name__ == '__main__':
t = time.time()
main()
print('total time =', time.time() - t)
分辨率?
会不会是将数据从主进程传输到子进程的开销(涉及共享内存读写)导致一切变慢?因此,这个最终版本试图消除这种放缓的潜在原因。在我的桌面上,我有 8 个处理器。对于第一种方法,在它们之间划分 N = 10000000 X
和 Y
值意味着每个进程应该处理 N // 8 -> 1250000
值。所以我写出了 16 组 1250000 个数字的随机数(X
有 8 组,Y
有 8 组)作为二进制文件,使用以下代码记录这 16 组中每一组的偏移量和长度代码:
import random
random.seed(1)
with open('temp.txt', 'wb') as f:
offsets = []
for i in range(16):
n = [str(random.randrange(300)) for _ in range(1250000)]
b = ','.join(n).encode('ascii')
l = len(b)
offsets.append((f.tell(), l))
f.write(b)
print(offsets)
然后我构建了列表 X_SPECS
和 Y_SPECS
辅助函数 matchdictfunc
可以用来读取值 X
和 Y
本身如所须。所以现在不是一次将 1250000 个值传递给这个辅助函数,我们只是将索引 0、1、... 7 传递给辅助函数,这样它就知道它必须读入哪个组。共享内存访问已完全消除在访问 X
和 Y
时(Q
和 Z
仍然需要它)并且磁盘访问已移至进程池。 CPU 利用率当然不会是 100%,因为 worker 函数正在执行 I/O。但是我发现虽然 运行ning 时间现在有了很大的改进,但它仍然比原来的非多处理版本没有改进:
OP's original program modified to read `X` and `Y` values in from file: 26.2 seconds
Multiprocessing elapsed time: 29.2 seconds
事实上,当我通过将 ProcessPoolExecutor
替换为 ThreadPoolExecutor
来更改代码以使用多线程时,经过的时间几乎又减少了一秒钟,这表明对全局解释器的争用很少锁定在 worker 函数内,即大部分时间都花在 C 语言代码上。主要工作由:
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
当我们使用多处理执行此操作时,我们有多个列表理解和多个 zip
操作(在较小的列表上)由单独的进程执行,然后我们 assemble 最后得到结果。这是我的推测,但通过采用已经高效的操作并将它们缩小到多个处理器,可能不会获得任何性能提升。或者换句话说,我被难住了,这是我最好的猜测。
最终版本(有一些额外的优化——请注意):
import random
import concurrent.futures
import time
POOL_SIZE = 8
X_SPECS = [(0, 4541088), (4541088, 4541824), (9082912, 4540691), (13623603, 4541385), (18164988, 4541459), (22706447, 4542961), (27249408, 4541847), (31791255, 4542186)]
Y_SPECS = [(36333441, 4542101), (40875542, 4540120), (45415662, 4540802), (49956464, 4540971), (54497435, 4541427), (59038862, 4541523), (63580385, 4541571), (68121956, 4542335)]
def init_pool(q_z):
global Q_Z
Q_Z = q_z
def matchdictfunc(index, i): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
x_offset, x_len = X_SPECS[i]
y_offset, y_len = Y_SPECS[i]
with open('temp.txt', 'rb') as f:
f.seek(x_offset, 0)
X = f.read(x_len).decode('ascii').split(',')
f.seek(y_offset, 0)
Y = f.read(y_len).decode('ascii').split(',')
lookup = {}
for i, (q, z) in enumerate(Q_Z):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a): # function to split list in POOL_SIZE even parts
k, m = divmod(N, POOL_SIZE)
divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
parts = []
for division in divisions:
part = [next(a) for _ in range(division)]
parts.append(part)
return parts
def main():
global N
random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
# Q and Z size is fixed at 120000
Q = (str(random.randrange(M)) for _ in range(120000))
Z = (str(random.randrange(M)) for _ in range(120000))
Q_Z = list(zip(Q, Z)) # pre-compute the `zip` function
# for multiprocessing:
# split lists to number of processors (POOL_SIZE)
# generator functions:
index = (str(x) for x in range(N))
index_split = split(index)
with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q_Z,)) as executor:
finallist = executor.map(matchdictfunc, index_split, range(8))
matchdict = {}
for d in finallist:
matchdict.update(d)
print(len(matchdict))
if __name__ == '__main__':
t = time.time()
main()
print('total time =', time.time() - t)
进程间内存传输的开销
在下面的代码中,函数 create_files
被调用以创建 100 个相同的文件,这些文件由 1,000,000 个数字的“腌制”列表组成。然后,我两次使用大小为 8 的多处理池来读取 100 个文件并解开这些文件以重建原始列表。第一种情况 (worker1
) 和第二种情况 (worker2
) 之间的区别在于,在第二种情况下,列表被 return 返回给调用者(但不保存以便内存可以立即被垃圾收集)。第二个案例比第一个案例花费的时间长三倍多。这也可以部分解释为什么在切换到多处理时看不到加速。
from multiprocessing import Pool
import pickle
import time
def create_files():
l = [i for i in range(1000000)]
# create 100 identical files:
for file in range(1, 101):
with open(f'pkl/test{file}.pkl', 'wb') as f:
pickle.dump(l, f)
def worker1(file):
file_name = f'pkl/test{file}.pkl'
with open(file_name, 'rb') as f:
obj = pickle.load(f)
def worker2(file):
file_name = f'pkl/test{file}.pkl'
with open(file_name, 'rb') as f:
obj = pickle.load(f)
return file_name, obj
POOLSIZE = 8
if __name__ == '__main__':
#create_files()
pool = Pool(POOLSIZE)
t = time.time()
# no data returned:
for file in range(1, 101):
pool.apply_async(worker1, args=(file,))
pool.close()
pool.join()
print(time.time() - t)
pool = Pool(POOLSIZE)
t = time.time()
for file in range(1, 101):
pool.apply_async(worker2, args=(file,))
pool.close()
pool.join()
print(time.time() - t)
t = time.time()
for file in range(1, 101):
worker2(file)
print(time.time() - t)
来自 22.02.21 的注释: -我的问题也可以通过更有效的内存使用而不是多处理来解决,因为我意识到内存负载变得非常高并且可能是这里的一个限制因素。
我正在尝试通过使用 multiprocessing 来减少我的脚本 运行 所需的时间。
过去我得到了一些关于提高函数本身速度的好技巧 (
到目前为止我尝试了什么:
- 拆分我的长列表 X 和 Y 为 偶数块(n 块,其中 n == cpu_count)
- 并行尝试“concurrent.futures.ProcessPoolExecutor()”到运行每个“子列表”的函数,最后合并每个子列表的结果处理到一个最终的字典(matchdict)。 (--> 见注释掉的部分)
我的问题:
- 当我取消对多处理部分的注释时,所有内核都在使用,但它最终出现一个我无法解决的错误(索引超出范围)。 (--> 提示:将 N 降低到 1000,您将立即看到错误,而无需永远等待)
有谁知道如何解决这个问题,或者可以建议在我的代码中使用多处理的更好方法?
代码如下:
import numpy as np
import multiprocessing
import concurrent.futures
np.random.seed(1)
def matchdictfunc(index,x,y,q,z): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a, n): # function to split list in n even parts
k, m = divmod(len(a), n)
return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))
def splitinput(index,X,Y,Q,Z): # split large lists X and Y in n-even parts (n = cpu_count), make new list containing n-times Q and Z (to feed Q and Z for every process)
cpu_count = multiprocessing.cpu_count()
#create multiple chunks for X and Y and index:
index_split = split(index,cpu_count)
X_split = split(X,cpu_count)
Y_split = split(Y,cpu_count)
# create list with several times Q and Z since it needs to be same length as X_split etc:
Q_mult = []
Z_mult = []
for _ in range(cpu_count):
Q_mult.append(Q)
Z_mult.append(Z)
return index_split,X_split,Y_split,Q_mult,Z_mult
# N will finally scale up to 10^9
N = 10000000
M = 300
index = [str(x) for x in list(range(N))]
X = np.random.randint(M, size=N)
Y = np.random.randint(M, size=N)
# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
X = np.char.mod('%d', X).tolist()
Y = np.char.mod('%d', Y).tolist()
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()
# single-core:
matchdict = matchdictfunc(index,X,Y,Q,Z)
# split lists to number of processors (cpu_count)
index_split,X_split,Y_split,Q_mult,Z_mult = splitinput(index,X,Y,Q,Z)
## Multiprocessing attempt - FAILS! (index out of range)
# finallist = []
# if __name__ == '__main__':
# with concurrent.futures.ProcessPoolExecutor() as executor:
# results = executor.map(matchlistfunc,X_split,Y_split,Q_mult,Z_mult)
# for result in results:
# finallist.append(result)
# matchdict = {}
# for d in finallist:
# matchdict.update(d)
你的函数matchdictfunc
目前有参数x
、y
、q
、z
但实际上并没有使用它们,尽管在多处理中version 它将需要使用两个参数。函数 splitinput
也不需要将 Q
和 Z
复制到 returned 值 Q_split
和 Z_split
中。目前,matchdictfunc
期望 Q
和 Z
成为全局变量,我们可以通过使用 initializer
和 initargs
构造池时的参数。您还应该将不需要由子进程执行的代码移动到由 if __name__ == '__main__':
控制的块中,例如 arary 初始化代码。这些更改导致:
import numpy as np
import multiprocessing
import concurrent.futures
MULTIPROCESSING = True
def init_pool(q, z):
global Q, Z
Q = q
Z = z
def matchdictfunc(index, X, Y): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a, n): # function to split list in n even parts
k, m = divmod(len(a), n)
return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))
def splitinput(index, X, Y): # split large lists X and Y in n-even parts (n = cpu_count))
cpu_count = multiprocessing.cpu_count()
#create multiple chunks for X and Y and index:
index_split = split(index,cpu_count)
X_split = split(X,cpu_count)
Y_split = split(Y,cpu_count)
return index_split, X_split ,Y_split
def main():
# following required for non-multiprocessing
if not MULTIPROCESSING:
global Q, Z
np.random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
index = [str(x) for x in list(range(N))]
X = np.random.randint(M, size=N)
Y = np.random.randint(M, size=N)
# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
X = np.char.mod('%d', X).tolist()
Y = np.char.mod('%d', Y).tolist()
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()
# for non-multiprocessing:
if not MULTIPROCESSING:
matchdict = matchdictfunc(index, X, Y)
else:
# for multiprocessing:
# split lists to number of processors (cpu_count)
index_split, X_split, Y_split = splitinput(index, X, Y)
with concurrent.futures.ProcessPoolExecutor(initializer=init_pool, initargs=(Q, Z)) as executor:
finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
matchdict = {}
for d in finallist:
matchdict.update(d)
#print(matchdict)
if __name__ == '__main__':
main()
注意:我尝试使用 N
= 1000 的较小值(打印出 matchdict
的结果),多处理版本似乎 return 相同的结果。我的机器没有足够的资源来 运行 并具有 N
的完整值而不冻结其他所有内容。
另一种方法
我假设您的 DNA 数据是外部数据并且 X
和 Y
值可以一次读取 n
个值 或者可以被读入和写出,以便这是可能的。 然后,与其将所有数据驻留在内存中并将其分成 32 个部分,我建议一次读取 n
个值并因此分解成大约 N/n
块。
在下面的代码中,我已经切换到使用 class multiprocessing.pool.Pool
中的 imap
方法。优点是lazily提交任务到进程池,即iterable
参数不必是列表或可转换为列表。相反,池将迭代 iterable 将任务发送到 chunksize
组中的池。在下面的代码中,我为 imap
的参数使用了一个生成器函数,它将生成连续的 X
和 Y
值。您实际的生成器函数将首先打开 DNA 文件(或多个文件)并读取文件的连续部分。
import numpy as np
import multiprocessing
def init_pool(q, z):
global Q, Z
Q = q
Z = z
def matchdictfunc(t): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
index, X, Y = t
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def next_tuple(n, stop, M):
start = 0
while True:
end = min(start + n, stop)
index = [str(x) for x in list(range(start, end))]
x = np.random.randint(M, size=n)
y = np.random.randint(M, size=n)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
x = np.char.mod('%d', x).tolist()
y = np.char.mod('%d', y).tolist()
yield (index, x, y)
start = end
if start >= stop:
break
def compute_chunksize(XY_AT_A_TIME, N):
n_tasks, remainder = divmod(N, XY_AT_A_TIME)
if remainder:
n_tasks += 1
chunksize, remainder = divmod(n_tasks, multiprocessing.cpu_count() * 4)
if remainder:
chunksize += 1
return chunksize
def main():
np.random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()
matchdict = {}
# number of X, Y pairs at a time:
# experiment with this, especially as N increases:
XY_AT_A_TIME = 10000
chunksize = compute_chunksize(XY_AT_A_TIME, N)
#print('chunksize =', chunksize) # 32 with 8 cores
with multiprocessing.Pool(initializer=init_pool, initargs=(Q, Z)) as pool:
for d in pool.imap(matchdictfunc, next_tuple(XY_AT_A_TIME, N, M), chunksize):
matchdict.update(d)
#print(matchdict)
if __name__ == '__main__':
import time
t = time.time()
main()
print('total time =', time.time() - t)
更新
我想从基准中删除使用 numpy
。众所周知,numpy
对其某些操作使用多处理,并且在多处理应用程序中使用时可能会导致性能下降。所以我做的第一件事就是把OP的原始程序和代码所在的位置,例如:
import numpy as np
np.random.seed(1)
X = np.random.randint(M, size=N)
X = np.char.mod('%d', X).tolist()
我将其替换为:
import random
random.seed(1)
X = [str(random.randrange(M)) for _ in range(N)]
然后我对 OP 的程序进行计时,以获取生成 X
、Y
、Q
和 Z
列表的时间以及总时间。在我的桌面上,时间分别约为 20 秒和 37 秒!所以在我的多处理版本中,仅仅为进程池的进程生成参数就超过了总 运行ning 时间的一半。对于第二种方法,我还发现,当我增加 XY_AT_A_TIME
的值时,CPU 利用率从 100% 下降到 50% 左右,但总运行时间有所改善。我还没有完全弄清楚这是为什么。
接下来,我尝试模拟程序在读取数据时将如何运行。所以我将 2 * N
个随机整数写到一个文件中,temp.txt
并修改了 OP 的程序从文件中初始化 X
和 Y
,然后修改我的第二种方法的 next_tuple
函数如下:
def next_tuple(n, stop, M):
with open('temp.txt') as f:
start = 0
while True:
end = min(start + n, stop)
index = [str(x) for x in range(start, end)] # improvement
x = [f.readline().strip() for _ in range(n)]
y = [f.readline().strip() for _ in range(n)]
yield (index, x, y)
start = end
if start >= stop:
break
随着我增加 XY_AT_A_TIME
,CPU 利用率再次下降(我发现的最佳性能值为 400000,CPU 利用率仅为 40% 左右)。
我终于重写了我的第一个方法,试图提高内存效率(见下文)。此更新版本再次从文件中读取随机数,但使用 X
、Y
和 index
的生成器函数,因此我不需要内存来存储完整列表和拆分。同样,我不希望多处理和非多处理版本出现重复结果,因为我在这两种情况下分配 X
和 Y
值的方式(一个简单的解决方案是将随机数写入 X
值文件和 Y
值文件,然后从这两个文件中读回值)。但这对 运行ning 次没有影响。但是,尽管使用默认池大小 8,CPU 利用率仍然只有 30-40%(波动很大),总体 运行ning 时间几乎是非多处理的两倍运行宁时间。但是为什么?
import random
import multiprocessing
import concurrent.futures
import time
MULTIPROCESSING = True
POOL_SIZE = multiprocessing.cpu_count()
def init_pool(q, z):
global Q, Z
Q = q
Z = z
def matchdictfunc(index, X, Y): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a): # function to split list in POOL_SIZE even parts
k, m = divmod(N, POOL_SIZE)
divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
parts = []
for division in divisions:
part = [next(a) for _ in range(division)]
parts.append(part)
return parts
def splitinput(index, X, Y): # split large lists X and Y in n-even parts (n = POOL_SIZE)
#create multiple chunks for X and Y and index:
index_split = split(index)
X_split = split(X)
Y_split = split(Y)
return index_split, X_split ,Y_split
def main():
global N
# following required for non-multiprocessing
if not MULTIPROCESSING:
global Q, Z
random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
# Q and Z size is fixed at 120000
Q = [str(random.randrange(M)) for _ in range(120000)]
Z = [str(random.randrange(M)) for _ in range(120000)]
with open('temp.txt') as f:
# for non-multiprocessing:
if not MULTIPROCESSING:
index = [str(x) for x in range(N)]
X = [f.readline().strip() for _ in range(N)]
Y = [f.readline().strip() for _ in range(N)]
matchdict = matchdictfunc(index, X, Y)
else:
# for multiprocessing:
# split lists to number of processors (POOL_SIZE)
# generator functions:
index = (str(x) for x in range(N))
X = (f.readline().strip() for _ in range(N))
Y = (f.readline().strip() for _ in range(N))
index_split, X_split, Y_split = splitinput(index, X, Y)
with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q, Z)) as executor:
finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
matchdict = {}
for d in finallist:
matchdict.update(d)
if __name__ == '__main__':
t = time.time()
main()
print('total time =', time.time() - t)
分辨率?
会不会是将数据从主进程传输到子进程的开销(涉及共享内存读写)导致一切变慢?因此,这个最终版本试图消除这种放缓的潜在原因。在我的桌面上,我有 8 个处理器。对于第一种方法,在它们之间划分 N = 10000000 X
和 Y
值意味着每个进程应该处理 N // 8 -> 1250000
值。所以我写出了 16 组 1250000 个数字的随机数(X
有 8 组,Y
有 8 组)作为二进制文件,使用以下代码记录这 16 组中每一组的偏移量和长度代码:
import random
random.seed(1)
with open('temp.txt', 'wb') as f:
offsets = []
for i in range(16):
n = [str(random.randrange(300)) for _ in range(1250000)]
b = ','.join(n).encode('ascii')
l = len(b)
offsets.append((f.tell(), l))
f.write(b)
print(offsets)
然后我构建了列表 X_SPECS
和 Y_SPECS
辅助函数 matchdictfunc
可以用来读取值 X
和 Y
本身如所须。所以现在不是一次将 1250000 个值传递给这个辅助函数,我们只是将索引 0、1、... 7 传递给辅助函数,这样它就知道它必须读入哪个组。共享内存访问已完全消除在访问 X
和 Y
时(Q
和 Z
仍然需要它)并且磁盘访问已移至进程池。 CPU 利用率当然不会是 100%,因为 worker 函数正在执行 I/O。但是我发现虽然 运行ning 时间现在有了很大的改进,但它仍然比原来的非多处理版本没有改进:
OP's original program modified to read `X` and `Y` values in from file: 26.2 seconds
Multiprocessing elapsed time: 29.2 seconds
事实上,当我通过将 ProcessPoolExecutor
替换为 ThreadPoolExecutor
来更改代码以使用多线程时,经过的时间几乎又减少了一秒钟,这表明对全局解释器的争用很少锁定在 worker 函数内,即大部分时间都花在 C 语言代码上。主要工作由:
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
当我们使用多处理执行此操作时,我们有多个列表理解和多个 zip
操作(在较小的列表上)由单独的进程执行,然后我们 assemble 最后得到结果。这是我的推测,但通过采用已经高效的操作并将它们缩小到多个处理器,可能不会获得任何性能提升。或者换句话说,我被难住了,这是我最好的猜测。
最终版本(有一些额外的优化——请注意):
import random
import concurrent.futures
import time
POOL_SIZE = 8
X_SPECS = [(0, 4541088), (4541088, 4541824), (9082912, 4540691), (13623603, 4541385), (18164988, 4541459), (22706447, 4542961), (27249408, 4541847), (31791255, 4542186)]
Y_SPECS = [(36333441, 4542101), (40875542, 4540120), (45415662, 4540802), (49956464, 4540971), (54497435, 4541427), (59038862, 4541523), (63580385, 4541571), (68121956, 4542335)]
def init_pool(q_z):
global Q_Z
Q_Z = q_z
def matchdictfunc(index, i): # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
x_offset, x_len = X_SPECS[i]
y_offset, y_len = Y_SPECS[i]
with open('temp.txt', 'rb') as f:
f.seek(x_offset, 0)
X = f.read(x_len).decode('ascii').split(',')
f.seek(y_offset, 0)
Y = f.read(y_len).decode('ascii').split(',')
lookup = {}
for i, (q, z) in enumerate(Q_Z):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a): # function to split list in POOL_SIZE even parts
k, m = divmod(N, POOL_SIZE)
divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
parts = []
for division in divisions:
part = [next(a) for _ in range(division)]
parts.append(part)
return parts
def main():
global N
random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
# Q and Z size is fixed at 120000
Q = (str(random.randrange(M)) for _ in range(120000))
Z = (str(random.randrange(M)) for _ in range(120000))
Q_Z = list(zip(Q, Z)) # pre-compute the `zip` function
# for multiprocessing:
# split lists to number of processors (POOL_SIZE)
# generator functions:
index = (str(x) for x in range(N))
index_split = split(index)
with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q_Z,)) as executor:
finallist = executor.map(matchdictfunc, index_split, range(8))
matchdict = {}
for d in finallist:
matchdict.update(d)
print(len(matchdict))
if __name__ == '__main__':
t = time.time()
main()
print('total time =', time.time() - t)
进程间内存传输的开销
在下面的代码中,函数 create_files
被调用以创建 100 个相同的文件,这些文件由 1,000,000 个数字的“腌制”列表组成。然后,我两次使用大小为 8 的多处理池来读取 100 个文件并解开这些文件以重建原始列表。第一种情况 (worker1
) 和第二种情况 (worker2
) 之间的区别在于,在第二种情况下,列表被 return 返回给调用者(但不保存以便内存可以立即被垃圾收集)。第二个案例比第一个案例花费的时间长三倍多。这也可以部分解释为什么在切换到多处理时看不到加速。
from multiprocessing import Pool
import pickle
import time
def create_files():
l = [i for i in range(1000000)]
# create 100 identical files:
for file in range(1, 101):
with open(f'pkl/test{file}.pkl', 'wb') as f:
pickle.dump(l, f)
def worker1(file):
file_name = f'pkl/test{file}.pkl'
with open(file_name, 'rb') as f:
obj = pickle.load(f)
def worker2(file):
file_name = f'pkl/test{file}.pkl'
with open(file_name, 'rb') as f:
obj = pickle.load(f)
return file_name, obj
POOLSIZE = 8
if __name__ == '__main__':
#create_files()
pool = Pool(POOLSIZE)
t = time.time()
# no data returned:
for file in range(1, 101):
pool.apply_async(worker1, args=(file,))
pool.close()
pool.join()
print(time.time() - t)
pool = Pool(POOLSIZE)
t = time.time()
for file in range(1, 101):
pool.apply_async(worker2, args=(file,))
pool.close()
pool.join()
print(time.time() - t)
t = time.time()
for file in range(1, 101):
worker2(file)
print(time.time() - t)