Python 分块 CSV 文件多处理

Python Chunking CSV File Multiproccessing

我正在使用以下代码将 CSV 文件拆分为多个块(来自 here

def worker(chunk):
    print len(chunk)

def keyfunc(row):
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'Counseling.csv'
    num_chunks = 10
    start_time = time.time()
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        reader.next()
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()

但是,无论我选择使用多少块,块的数量似乎总是保持不变。例如,无论我选择有 1 个还是 10 个块,我在处理样本文件时总是得到这个输出。理想情况下,我想对文件进行分块,以便公平分布。

请注意,我分块的真实文件超过 1300 万行,这就是我逐个处理它的原因。那是必须的!

6
7
1
...
1
1
94
--- 0.101687192917 seconds ---

首先,如果记录尚未在键列上排序,itertools.groupby 将没有任何实际意义。 而且,如果你的需求只是将csv文件分块成预定行数交给worker,那么你就不用做这些了。

一个简单的实现是:

import csv
from multiprocessing import Pool


def worker(chunk):
    print len(chunk)

def emit_chunks(chunk_size, file_path):
    lines_count = 0
    with open(file_path) as f:
        reader = csv.reader(f)
        chunk = []
        for line in reader:
            lines_count += 1
            chunk.append(line)
            if lines_count == chunk_size:
                lines_count = 0
                yield chunk
                chunk = []
            else:
                continue
        if chunk : yield chunk

def main():
    chunk_size = 10
    gen = emit_chunks(chunk_size, 'c:/Temp/in.csv')
    p = Pool(5)
    p.imap(worker, gen)
    print 'Completed..'

*编辑:改为 pool.imap 而不是 pool.map

根据 评论, 我们希望每个进程都在 10000 行的块上工作。这并不难 去做;请参阅下面的 iter/islice 食谱。但是,使用

的问题
pool.map(worker, ten_thousand_row_chunks)

pool.map将尝试将所有块放入任务队列 马上。如果这需要比可用内存更多的内存,那么你会得到一个 MemoryError。 (注:pool.imapsuffers from the same problem。)

因此,我们需要在每个块的片段上迭代调用 pool.map

import itertools as IT
import multiprocessing as mp
import csv

def worker(chunk):
    return len(chunk)

def main():
    # num_procs is the number of workers in the pool
    num_procs = mp.cpu_count()
    # chunksize is the number of lines in a chunk
    chunksize = 10**5

    pool = mp.Pool(num_procs)
    largefile = 'Counseling.csv'
    results = []
    with open(largefile, 'rb') as f:
        reader = csv.reader(f)
        for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
            chunk = iter(chunk)
            pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
            result = pool.map(worker, pieces)
            results.extend(result)
    print(results)
    pool.close()
    pool.join()

main()

每个 chunk 将包含文件中最多 chunksize*num_procs 行。 这是足够的数据来为池中的所有工作人员提供一些工作,但不会太大而导致 MemoryError ——前提是 chunksize 没有设置得太大。

然后每个 chunk 被分解成碎片,每个碎片最多包含 文件中的 chunksize 行。然后将这些片段发送到 pool.map.


iter(lambda: list(IT.islice(iterator, chunksize)), [])如何工作:

这是一个习惯用法,用于将迭代器分组为长度为 chunksize 的块。 让我们看看它是如何在一个例子中工作的:

In [111]: iterator = iter(range(10))

请注意,每次调用 IT.islice(iterator, 3) 时,一个包含 3 个项目的新块 从迭代器中切出:

In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]

In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]

In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]

当迭代器中剩余的项目少于 3 个时,只返回剩余的项目:

In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]

如果你再次调用它,你会得到一个空列表:

In [116]: list(IT.islice(iterable, 3))
Out[116]: []

lambda: list(IT.islice(iterator, chunksize)) 是一个在调用时 returns list(IT.islice(iterator, chunksize)) 的函数。它是一个 "one-liner" 相当于

def func():
    return  list(IT.islice(iterator, chunksize))

最后,iter(callable, sentinel) returns 另一个迭代器。此迭代器生成的值是可调用对象返回的值。它不断产生值,直到可调用 returns 的值等于哨兵。所以

iter(lambda: list(IT.islice(iterator, chunksize)), [])

将继续返回值 list(IT.islice(iterator, chunksize)) 直到该值为空列表:

In [121]: iterator = iter(range(10))

In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]