Python 分块 CSV 文件多处理
Python Chunking CSV File Multiproccessing
我正在使用以下代码将 CSV 文件拆分为多个块(来自 here)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
但是,无论我选择使用多少块,块的数量似乎总是保持不变。例如,无论我选择有 1 个还是 10 个块,我在处理样本文件时总是得到这个输出。理想情况下,我想对文件进行分块,以便公平分布。
请注意,我分块的真实文件超过 1300 万行,这就是我逐个处理它的原因。那是必须的!
6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
首先,如果记录尚未在键列上排序,itertools.groupby 将没有任何实际意义。
而且,如果你的需求只是将csv文件分块成预定行数交给worker,那么你就不用做这些了。
一个简单的实现是:
import csv
from multiprocessing import Pool
def worker(chunk):
print len(chunk)
def emit_chunks(chunk_size, file_path):
lines_count = 0
with open(file_path) as f:
reader = csv.reader(f)
chunk = []
for line in reader:
lines_count += 1
chunk.append(line)
if lines_count == chunk_size:
lines_count = 0
yield chunk
chunk = []
else:
continue
if chunk : yield chunk
def main():
chunk_size = 10
gen = emit_chunks(chunk_size, 'c:/Temp/in.csv')
p = Pool(5)
p.imap(worker, gen)
print 'Completed..'
*编辑:改为 pool.imap 而不是 pool.map
根据
评论,
我们希望每个进程都在 10000 行的块上工作。这并不难
去做;请参阅下面的 iter/islice
食谱。但是,使用
的问题
pool.map(worker, ten_thousand_row_chunks)
是pool.map
将尝试将所有块放入任务队列
马上。如果这需要比可用内存更多的内存,那么你会得到一个
MemoryError
。 (注:pool.imap
suffers from the same problem。)
因此,我们需要在每个块的片段上迭代调用 pool.map
。
import itertools as IT
import multiprocessing as mp
import csv
def worker(chunk):
return len(chunk)
def main():
# num_procs is the number of workers in the pool
num_procs = mp.cpu_count()
# chunksize is the number of lines in a chunk
chunksize = 10**5
pool = mp.Pool(num_procs)
largefile = 'Counseling.csv'
results = []
with open(largefile, 'rb') as f:
reader = csv.reader(f)
for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
chunk = iter(chunk)
pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
result = pool.map(worker, pieces)
results.extend(result)
print(results)
pool.close()
pool.join()
main()
每个 chunk
将包含文件中最多 chunksize*num_procs
行。
这是足够的数据来为池中的所有工作人员提供一些工作,但不会太大而导致 MemoryError ——前提是 chunksize
没有设置得太大。
然后每个 chunk
被分解成碎片,每个碎片最多包含
文件中的 chunksize
行。然后将这些片段发送到 pool.map
.
iter(lambda: list(IT.islice(iterator, chunksize)), [])
如何工作:
这是一个习惯用法,用于将迭代器分组为长度为 chunksize 的块。
让我们看看它是如何在一个例子中工作的:
In [111]: iterator = iter(range(10))
请注意,每次调用 IT.islice(iterator, 3)
时,一个包含 3 个项目的新块
从迭代器中切出:
In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]
In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]
In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]
当迭代器中剩余的项目少于 3 个时,只返回剩余的项目:
In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]
如果你再次调用它,你会得到一个空列表:
In [116]: list(IT.islice(iterable, 3))
Out[116]: []
lambda: list(IT.islice(iterator, chunksize))
是一个在调用时 returns list(IT.islice(iterator, chunksize))
的函数。它是一个 "one-liner" 相当于
def func():
return list(IT.islice(iterator, chunksize))
最后,iter(callable, sentinel)
returns 另一个迭代器。此迭代器生成的值是可调用对象返回的值。它不断产生值,直到可调用 returns 的值等于哨兵。所以
iter(lambda: list(IT.islice(iterator, chunksize)), [])
将继续返回值 list(IT.islice(iterator, chunksize))
直到该值为空列表:
In [121]: iterator = iter(range(10))
In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
我正在使用以下代码将 CSV 文件拆分为多个块(来自 here)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
但是,无论我选择使用多少块,块的数量似乎总是保持不变。例如,无论我选择有 1 个还是 10 个块,我在处理样本文件时总是得到这个输出。理想情况下,我想对文件进行分块,以便公平分布。
请注意,我分块的真实文件超过 1300 万行,这就是我逐个处理它的原因。那是必须的!
6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
首先,如果记录尚未在键列上排序,itertools.groupby 将没有任何实际意义。 而且,如果你的需求只是将csv文件分块成预定行数交给worker,那么你就不用做这些了。
一个简单的实现是:
import csv
from multiprocessing import Pool
def worker(chunk):
print len(chunk)
def emit_chunks(chunk_size, file_path):
lines_count = 0
with open(file_path) as f:
reader = csv.reader(f)
chunk = []
for line in reader:
lines_count += 1
chunk.append(line)
if lines_count == chunk_size:
lines_count = 0
yield chunk
chunk = []
else:
continue
if chunk : yield chunk
def main():
chunk_size = 10
gen = emit_chunks(chunk_size, 'c:/Temp/in.csv')
p = Pool(5)
p.imap(worker, gen)
print 'Completed..'
*编辑:改为 pool.imap 而不是 pool.map
根据
评论,
我们希望每个进程都在 10000 行的块上工作。这并不难
去做;请参阅下面的 iter/islice
食谱。但是,使用
pool.map(worker, ten_thousand_row_chunks)
是pool.map
将尝试将所有块放入任务队列
马上。如果这需要比可用内存更多的内存,那么你会得到一个
MemoryError
。 (注:pool.imap
suffers from the same problem。)
因此,我们需要在每个块的片段上迭代调用 pool.map
。
import itertools as IT
import multiprocessing as mp
import csv
def worker(chunk):
return len(chunk)
def main():
# num_procs is the number of workers in the pool
num_procs = mp.cpu_count()
# chunksize is the number of lines in a chunk
chunksize = 10**5
pool = mp.Pool(num_procs)
largefile = 'Counseling.csv'
results = []
with open(largefile, 'rb') as f:
reader = csv.reader(f)
for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
chunk = iter(chunk)
pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
result = pool.map(worker, pieces)
results.extend(result)
print(results)
pool.close()
pool.join()
main()
每个 chunk
将包含文件中最多 chunksize*num_procs
行。
这是足够的数据来为池中的所有工作人员提供一些工作,但不会太大而导致 MemoryError ——前提是 chunksize
没有设置得太大。
然后每个 chunk
被分解成碎片,每个碎片最多包含
文件中的 chunksize
行。然后将这些片段发送到 pool.map
.
iter(lambda: list(IT.islice(iterator, chunksize)), [])
如何工作:
这是一个习惯用法,用于将迭代器分组为长度为 chunksize 的块。 让我们看看它是如何在一个例子中工作的:
In [111]: iterator = iter(range(10))
请注意,每次调用 IT.islice(iterator, 3)
时,一个包含 3 个项目的新块
从迭代器中切出:
In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]
In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]
In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]
当迭代器中剩余的项目少于 3 个时,只返回剩余的项目:
In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]
如果你再次调用它,你会得到一个空列表:
In [116]: list(IT.islice(iterable, 3))
Out[116]: []
lambda: list(IT.islice(iterator, chunksize))
是一个在调用时 returns list(IT.islice(iterator, chunksize))
的函数。它是一个 "one-liner" 相当于
def func():
return list(IT.islice(iterator, chunksize))
最后,iter(callable, sentinel)
returns 另一个迭代器。此迭代器生成的值是可调用对象返回的值。它不断产生值,直到可调用 returns 的值等于哨兵。所以
iter(lambda: list(IT.islice(iterator, chunksize)), [])
将继续返回值 list(IT.islice(iterator, chunksize))
直到该值为空列表:
In [121]: iterator = iter(range(10))
In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]