Python,并行处理大型文本文件

Python, process a large text file in parallel

数据文件(SAM文件)中的样本记录:

M01383  0  chr4  66439384  255  31M  *  0  0  AAGAGGA GFAFHGD  MD:Z:31 NM:i:0
M01382  0  chr1  241995435  255 31M  *  0  0  ATCCAAG AFHTTAG  MD:Z:31 NM:i:0
......

我需要逐行遍历数据文件中的记录,从每一行中获取特定值(例如第4个值,66439384),并将该值传递给另一个函数进行处理。然后一些结果计数器将被更新。

基本的工作流程是这样的:

# global variable, counters will be updated in search function according to the value passed. 
counter_a = 0    
counter_b = 0
counter_c = 0

open textfile:
    for line in textfile:
        value = line.split()[3]
        search_function(value)    # this function takes abit long time to process

def search_function (value):
    some conditions checking:
        update the counter_a or counter_b or counter_c

使用单个进程代码和大约1.5G 的数据文件,运行 花了大约20 个小时来遍历一个数据文件中的所有记录。我需要更快的代码,因为有超过 30 个此类数据文件。

我想并行处理N个chunk的数据文件,每个chunk会执行上面的workflow,更新全局变量(counter_a, counter_b, counter_c)同时。但我不知道如何在代码中实现这一点,或者这是否可行。

我可以访问一台服务器机器:24 个处理器和大约 40G RAM。

有人可以帮忙吗?非常感谢。

您不想做的是将文件交给个人 CPU。如果是这种情况,文件 open/reads 可能会导致磁头在整个磁盘上随机反弹,因为文件很可能遍布整个磁盘。

相反,将每个文件分成块并处理块。

用一个 CPU 打开文件。将整个内容读入数组文本。假设您的文件以相对较大的顺序块放置在磁盘上,您想要进行一次大规模读取以防止磁头在磁盘周围颠簸。

将其大小(以字节为单位)除以 N,给出(全局)值 K,即每个 CPU 应处理的近似字节数。分叉 N 个线程,并为每个线程 i 分配索引 i,并为每个文件分配一个复制的句柄。

每个线程 i 启动一个线程本地扫描指针 p 到 Text 作为偏移量 i*K。它扫描文本,递增 p 并忽略文本,直到找到换行符。此时,它开始处理行(在扫描行时增加 p)。 Tt 在处理一行后停止,当它在文本文件中的索引大于 (i+1)*K.

如果每行的工作量大致相等,那么您的 N 个核心将在大约同一时间完成。

(如果您有多个文件,则可以开始下一个)。

如果您知道文件大小小于内存,您可以将文件读取安排为流水线,例如,在处理当前文件时,文件读取线程正在读取下一个文件。

最简单的方法可能是使用现有代码一次完成所有 30 个文件——仍然需要一整天,但您可以一次完成所有文件。 (即“9个月9个宝宝”容易,“1个月1个宝宝”难)

如果您真的想更快地完成单个文件,这将取决于您的计数器实际更新的方式。如果几乎所有工作都只是分析价值,您可以使用多处理模块卸载它:

import time
import multiprocessing

def slowfunc(value):
    time.sleep(0.01)
    return value**2 + 0.3*value + 1

counter_a = counter_b = counter_c = 0
def add_to_counter(res):
    global counter_a, counter_b, counter_c
    counter_a += res
    counter_b -= (res - 10)**2
    counter_c += (int(res) % 2)

pool = multiprocessing.Pool(50)
results = []

for value in range(100000):
    r = pool.apply_async(slowfunc, [value])
    results.append(r)

    # don't let the queue grow too long
    if len(results) == 1000:
        results[0].wait()

    while results and results[0].ready():
        r = results.pop(0)
        add_to_counter(r.get())

for r in results:
    r.wait()
    add_to_counter(r.get())

print counter_a, counter_b, counter_c

这将允许 50 个 slowfuncs 并行 运行,因此不需要 1000s (=100k*0.01s),而是需要 20s (100k/50)*0.01s 才能完成。如果您可以像上面那样将您的函数重组为 "slowfunc" 和 "add_to_counter",您应该能够获得 24 倍的加速。

一次读取一个文件,使用所有CPU 运行 search_function():

#!/usr/bin/env python
from multiprocessing import Array, Pool

def init(counters_): # called for each child process
    global counters
    counters = counters_

def search_function (value): # assume it is CPU-intensive task
    some conditions checking:
        update the counter_a or counter_b or counter_c
        counter[0] += 1 # counter 'a'
        counter[1] += 1 # counter 'b'
    return value, result, error

if __name__ == '__main__':
    counters = Array('i', [0]*3)
    pool = Pool(initializer=init, initargs=[counters])
    values = (line.split()[3] for line in textfile)
    for value, result, error in pool.imap_unordered(search_function, values,
                                                    chunksize=1000):
        if error is not None:
            print('value: {value}, error: {error}'.format(**vars()))
    pool.close()
    pool.join()
    print(list(counters))

确保(例如,通过编写包装器)异常不会转义 next(values)search_function().

此解决方案适用于一组文件。

对于每个文件,它将其分成指定数量的行对齐块,并行求解每个块,然后合并结果。

它从磁盘流式传输每个块;这有点慢,但不会消耗太多内存。我们依靠磁盘缓存和缓冲读取来防止磁头抖动。

用法就像

python script.py -n 16 sam1.txt sam2.txt sam3.txt

并且script.py

import argparse
from io import SEEK_END 
import multiprocessing as mp

#
# Worker process
#
def summarize(fname, start, stop):
    """
    Process file[start:stop]

    start and stop both point to first char of a line (or EOF)
    """
    a = 0
    b = 0
    c = 0

    with open(fname, newline='') as inf:
        # jump to start position
        pos = start
        inf.seek(pos)

        for line in inf:
            value = int(line.split(4)[3])

            # *** START EDIT HERE ***
            #

            # update a, b, c based on value

            #
            # *** END EDIT HERE ***

            pos += len(line)
            if pos >= stop:
                break

    return a, b, c

def main(num_workers, sam_files):
    print("{} workers".format(num_workers))
    pool = mp.Pool(processes=num_workers)

    # for each input file
    for fname in sam_files:
        print("Dividing {}".format(fname))
        # decide how to divide up the file
        with open(fname) as inf:
            # get file length
            inf.seek(0, SEEK_END)
            f_len = inf.tell()
            # find break-points
            starts = [0]
            for n in range(1, num_workers):
                # jump to approximate break-point
                inf.seek(n * f_len // num_workers)
                # find start of next full line
                inf.readline()
                # store offset
                starts.append(inf.tell())

        # do it!
        stops = starts[1:] + [f_len]
        start_stops =  zip(starts, stops)
        print("Solving {}".format(fname))
        results = [pool.apply(summarize, args=(fname, start, stop)) for start,stop in start_stops]

        # collect results
        results = [sum(col) for col in zip(*results)]
        print(results)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Parallel text processor')
    parser.add_argument('--num_workers', '-n', default=8, type=int)
    parser.add_argument('sam_files', nargs='+')
    args = parser.parse_args()
    main(args.num_workers, args.sam_files)
    main(args.num_workers, args.sam_files)