Python,并行处理大型文本文件
Python, process a large text file in parallel
数据文件(SAM文件)中的样本记录:
M01383 0 chr4 66439384 255 31M * 0 0 AAGAGGA GFAFHGD MD:Z:31 NM:i:0
M01382 0 chr1 241995435 255 31M * 0 0 ATCCAAG AFHTTAG MD:Z:31 NM:i:0
......
- 数据文件是逐行的
- 数据文件的大小从 1G - 5G 不等。
我需要逐行遍历数据文件中的记录,从每一行中获取特定值(例如第4个值,66439384),并将该值传递给另一个函数进行处理。然后一些结果计数器将被更新。
基本的工作流程是这样的:
# global variable, counters will be updated in search function according to the value passed.
counter_a = 0
counter_b = 0
counter_c = 0
open textfile:
for line in textfile:
value = line.split()[3]
search_function(value) # this function takes abit long time to process
def search_function (value):
some conditions checking:
update the counter_a or counter_b or counter_c
使用单个进程代码和大约1.5G 的数据文件,运行 花了大约20 个小时来遍历一个数据文件中的所有记录。我需要更快的代码,因为有超过 30 个此类数据文件。
我想并行处理N个chunk的数据文件,每个chunk会执行上面的workflow,更新全局变量(counter_a, counter_b, counter_c)同时。但我不知道如何在代码中实现这一点,或者这是否可行。
我可以访问一台服务器机器:24 个处理器和大约 40G RAM。
有人可以帮忙吗?非常感谢。
您不想做的是将文件交给个人 CPU。如果是这种情况,文件 open/reads 可能会导致磁头在整个磁盘上随机反弹,因为文件很可能遍布整个磁盘。
相反,将每个文件分成块并处理块。
用一个 CPU 打开文件。将整个内容读入数组文本。假设您的文件以相对较大的顺序块放置在磁盘上,您想要进行一次大规模读取以防止磁头在磁盘周围颠簸。
将其大小(以字节为单位)除以 N,给出(全局)值 K,即每个 CPU 应处理的近似字节数。分叉 N 个线程,并为每个线程 i 分配索引 i,并为每个文件分配一个复制的句柄。
每个线程 i 启动一个线程本地扫描指针 p 到 Text 作为偏移量 i*K。它扫描文本,递增 p 并忽略文本,直到找到换行符。此时,它开始处理行(在扫描行时增加 p)。 Tt 在处理一行后停止,当它在文本文件中的索引大于 (i+1)*K.
如果每行的工作量大致相等,那么您的 N 个核心将在大约同一时间完成。
(如果您有多个文件,则可以开始下一个)。
如果您知道文件大小小于内存,您可以将文件读取安排为流水线,例如,在处理当前文件时,文件读取线程正在读取下一个文件。
最简单的方法可能是使用现有代码一次完成所有 30 个文件——仍然需要一整天,但您可以一次完成所有文件。 (即“9个月9个宝宝”容易,“1个月1个宝宝”难)
如果您真的想更快地完成单个文件,这将取决于您的计数器实际更新的方式。如果几乎所有工作都只是分析价值,您可以使用多处理模块卸载它:
import time
import multiprocessing
def slowfunc(value):
time.sleep(0.01)
return value**2 + 0.3*value + 1
counter_a = counter_b = counter_c = 0
def add_to_counter(res):
global counter_a, counter_b, counter_c
counter_a += res
counter_b -= (res - 10)**2
counter_c += (int(res) % 2)
pool = multiprocessing.Pool(50)
results = []
for value in range(100000):
r = pool.apply_async(slowfunc, [value])
results.append(r)
# don't let the queue grow too long
if len(results) == 1000:
results[0].wait()
while results and results[0].ready():
r = results.pop(0)
add_to_counter(r.get())
for r in results:
r.wait()
add_to_counter(r.get())
print counter_a, counter_b, counter_c
这将允许 50 个 slowfuncs 并行 运行,因此不需要 1000s (=100k*0.01s),而是需要 20s (100k/50)*0.01s 才能完成。如果您可以像上面那样将您的函数重组为 "slowfunc" 和 "add_to_counter",您应该能够获得 24 倍的加速。
一次读取一个文件,使用所有CPU 运行 search_function()
:
#!/usr/bin/env python
from multiprocessing import Array, Pool
def init(counters_): # called for each child process
global counters
counters = counters_
def search_function (value): # assume it is CPU-intensive task
some conditions checking:
update the counter_a or counter_b or counter_c
counter[0] += 1 # counter 'a'
counter[1] += 1 # counter 'b'
return value, result, error
if __name__ == '__main__':
counters = Array('i', [0]*3)
pool = Pool(initializer=init, initargs=[counters])
values = (line.split()[3] for line in textfile)
for value, result, error in pool.imap_unordered(search_function, values,
chunksize=1000):
if error is not None:
print('value: {value}, error: {error}'.format(**vars()))
pool.close()
pool.join()
print(list(counters))
确保(例如,通过编写包装器)异常不会转义 next(values)
、search_function()
.
此解决方案适用于一组文件。
对于每个文件,它将其分成指定数量的行对齐块,并行求解每个块,然后合并结果。
它从磁盘流式传输每个块;这有点慢,但不会消耗太多内存。我们依靠磁盘缓存和缓冲读取来防止磁头抖动。
用法就像
python script.py -n 16 sam1.txt sam2.txt sam3.txt
并且script.py
是
import argparse
from io import SEEK_END
import multiprocessing as mp
#
# Worker process
#
def summarize(fname, start, stop):
"""
Process file[start:stop]
start and stop both point to first char of a line (or EOF)
"""
a = 0
b = 0
c = 0
with open(fname, newline='') as inf:
# jump to start position
pos = start
inf.seek(pos)
for line in inf:
value = int(line.split(4)[3])
# *** START EDIT HERE ***
#
# update a, b, c based on value
#
# *** END EDIT HERE ***
pos += len(line)
if pos >= stop:
break
return a, b, c
def main(num_workers, sam_files):
print("{} workers".format(num_workers))
pool = mp.Pool(processes=num_workers)
# for each input file
for fname in sam_files:
print("Dividing {}".format(fname))
# decide how to divide up the file
with open(fname) as inf:
# get file length
inf.seek(0, SEEK_END)
f_len = inf.tell()
# find break-points
starts = [0]
for n in range(1, num_workers):
# jump to approximate break-point
inf.seek(n * f_len // num_workers)
# find start of next full line
inf.readline()
# store offset
starts.append(inf.tell())
# do it!
stops = starts[1:] + [f_len]
start_stops = zip(starts, stops)
print("Solving {}".format(fname))
results = [pool.apply(summarize, args=(fname, start, stop)) for start,stop in start_stops]
# collect results
results = [sum(col) for col in zip(*results)]
print(results)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Parallel text processor')
parser.add_argument('--num_workers', '-n', default=8, type=int)
parser.add_argument('sam_files', nargs='+')
args = parser.parse_args()
main(args.num_workers, args.sam_files)
main(args.num_workers, args.sam_files)
数据文件(SAM文件)中的样本记录:
M01383 0 chr4 66439384 255 31M * 0 0 AAGAGGA GFAFHGD MD:Z:31 NM:i:0
M01382 0 chr1 241995435 255 31M * 0 0 ATCCAAG AFHTTAG MD:Z:31 NM:i:0
......
- 数据文件是逐行的
- 数据文件的大小从 1G - 5G 不等。
我需要逐行遍历数据文件中的记录,从每一行中获取特定值(例如第4个值,66439384),并将该值传递给另一个函数进行处理。然后一些结果计数器将被更新。
基本的工作流程是这样的:
# global variable, counters will be updated in search function according to the value passed.
counter_a = 0
counter_b = 0
counter_c = 0
open textfile:
for line in textfile:
value = line.split()[3]
search_function(value) # this function takes abit long time to process
def search_function (value):
some conditions checking:
update the counter_a or counter_b or counter_c
使用单个进程代码和大约1.5G 的数据文件,运行 花了大约20 个小时来遍历一个数据文件中的所有记录。我需要更快的代码,因为有超过 30 个此类数据文件。
我想并行处理N个chunk的数据文件,每个chunk会执行上面的workflow,更新全局变量(counter_a, counter_b, counter_c)同时。但我不知道如何在代码中实现这一点,或者这是否可行。
我可以访问一台服务器机器:24 个处理器和大约 40G RAM。
有人可以帮忙吗?非常感谢。
您不想做的是将文件交给个人 CPU。如果是这种情况,文件 open/reads 可能会导致磁头在整个磁盘上随机反弹,因为文件很可能遍布整个磁盘。
相反,将每个文件分成块并处理块。
用一个 CPU 打开文件。将整个内容读入数组文本。假设您的文件以相对较大的顺序块放置在磁盘上,您想要进行一次大规模读取以防止磁头在磁盘周围颠簸。
将其大小(以字节为单位)除以 N,给出(全局)值 K,即每个 CPU 应处理的近似字节数。分叉 N 个线程,并为每个线程 i 分配索引 i,并为每个文件分配一个复制的句柄。
每个线程 i 启动一个线程本地扫描指针 p 到 Text 作为偏移量 i*K。它扫描文本,递增 p 并忽略文本,直到找到换行符。此时,它开始处理行(在扫描行时增加 p)。 Tt 在处理一行后停止,当它在文本文件中的索引大于 (i+1)*K.
如果每行的工作量大致相等,那么您的 N 个核心将在大约同一时间完成。
(如果您有多个文件,则可以开始下一个)。
如果您知道文件大小小于内存,您可以将文件读取安排为流水线,例如,在处理当前文件时,文件读取线程正在读取下一个文件。
最简单的方法可能是使用现有代码一次完成所有 30 个文件——仍然需要一整天,但您可以一次完成所有文件。 (即“9个月9个宝宝”容易,“1个月1个宝宝”难)
如果您真的想更快地完成单个文件,这将取决于您的计数器实际更新的方式。如果几乎所有工作都只是分析价值,您可以使用多处理模块卸载它:
import time
import multiprocessing
def slowfunc(value):
time.sleep(0.01)
return value**2 + 0.3*value + 1
counter_a = counter_b = counter_c = 0
def add_to_counter(res):
global counter_a, counter_b, counter_c
counter_a += res
counter_b -= (res - 10)**2
counter_c += (int(res) % 2)
pool = multiprocessing.Pool(50)
results = []
for value in range(100000):
r = pool.apply_async(slowfunc, [value])
results.append(r)
# don't let the queue grow too long
if len(results) == 1000:
results[0].wait()
while results and results[0].ready():
r = results.pop(0)
add_to_counter(r.get())
for r in results:
r.wait()
add_to_counter(r.get())
print counter_a, counter_b, counter_c
这将允许 50 个 slowfuncs 并行 运行,因此不需要 1000s (=100k*0.01s),而是需要 20s (100k/50)*0.01s 才能完成。如果您可以像上面那样将您的函数重组为 "slowfunc" 和 "add_to_counter",您应该能够获得 24 倍的加速。
一次读取一个文件,使用所有CPU 运行 search_function()
:
#!/usr/bin/env python
from multiprocessing import Array, Pool
def init(counters_): # called for each child process
global counters
counters = counters_
def search_function (value): # assume it is CPU-intensive task
some conditions checking:
update the counter_a or counter_b or counter_c
counter[0] += 1 # counter 'a'
counter[1] += 1 # counter 'b'
return value, result, error
if __name__ == '__main__':
counters = Array('i', [0]*3)
pool = Pool(initializer=init, initargs=[counters])
values = (line.split()[3] for line in textfile)
for value, result, error in pool.imap_unordered(search_function, values,
chunksize=1000):
if error is not None:
print('value: {value}, error: {error}'.format(**vars()))
pool.close()
pool.join()
print(list(counters))
确保(例如,通过编写包装器)异常不会转义 next(values)
、search_function()
.
此解决方案适用于一组文件。
对于每个文件,它将其分成指定数量的行对齐块,并行求解每个块,然后合并结果。
它从磁盘流式传输每个块;这有点慢,但不会消耗太多内存。我们依靠磁盘缓存和缓冲读取来防止磁头抖动。
用法就像
python script.py -n 16 sam1.txt sam2.txt sam3.txt
并且script.py
是
import argparse
from io import SEEK_END
import multiprocessing as mp
#
# Worker process
#
def summarize(fname, start, stop):
"""
Process file[start:stop]
start and stop both point to first char of a line (or EOF)
"""
a = 0
b = 0
c = 0
with open(fname, newline='') as inf:
# jump to start position
pos = start
inf.seek(pos)
for line in inf:
value = int(line.split(4)[3])
# *** START EDIT HERE ***
#
# update a, b, c based on value
#
# *** END EDIT HERE ***
pos += len(line)
if pos >= stop:
break
return a, b, c
def main(num_workers, sam_files):
print("{} workers".format(num_workers))
pool = mp.Pool(processes=num_workers)
# for each input file
for fname in sam_files:
print("Dividing {}".format(fname))
# decide how to divide up the file
with open(fname) as inf:
# get file length
inf.seek(0, SEEK_END)
f_len = inf.tell()
# find break-points
starts = [0]
for n in range(1, num_workers):
# jump to approximate break-point
inf.seek(n * f_len // num_workers)
# find start of next full line
inf.readline()
# store offset
starts.append(inf.tell())
# do it!
stops = starts[1:] + [f_len]
start_stops = zip(starts, stops)
print("Solving {}".format(fname))
results = [pool.apply(summarize, args=(fname, start, stop)) for start,stop in start_stops]
# collect results
results = [sum(col) for col in zip(*results)]
print(results)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Parallel text processor')
parser.add_argument('--num_workers', '-n', default=8, type=int)
parser.add_argument('sam_files', nargs='+')
args = parser.parse_args()
main(args.num_workers, args.sam_files)
main(args.num_workers, args.sam_files)