Python 多处理:解析、编辑和编写长系列的 csv 文件
Python multiprocessing: parsing, editing, and writing long series of csv files
我有很长一系列类似的 cvs 文件(总共 14Gb)。我需要打开每个文件,替换某些字符,并将固定版本写入新文件。我想使用我的多核计算机的处理能力。我尝试使用 mp.Pools 和 mp.Process/mp.Queue。池版本有效,但队列方法会产生此错误:
IOError: [Errno 22] invalid mode ('r') or filename: '<multiprocessing.queues.Queue object at 0x0000000002775A90>'
这是我的 Pool 代码的简化版本:
import os
import pandas as pd
import multiprocessing as mp
def fixer(a_file):
lines = []
opened_file = open(a_file)
for each_line in opened_file:
lines.append(each_line.replace('mad', 'rational'))
opened_file.close()
df = pd.DataFrame(lines)
#some pandas magics here
df.to_csv(a_file[:-4] + '_fixed.csv')
if __name__ == "__main__":
my_path = os.getcwd()
my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here
processors = mp.cpu_count()
pool = mp.Pool(processes = processors) # I set as many processes as processors my computer has.
pool.map(fixer, my_files)
这是队列方法:
import os
import pandas as pd
import multiprocessing as mp
def fixer(a_file):
lines = []
opened_file = open(a_file)
for each_line in opened_file:
lines.append(each_line.replace('mad', 'rational'))
opened_file.close()
df = pd.DataFrame(lines)
#some pandas magics here
df.to_csv(a_file[:-4] + '_fixed.csv')
if __name__ == "__main__":
my_path = os.getcwd()
my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here
processors = mp.cpu_count()
queue = mp.Queue()
for each_file in my_files:
queue.put(each_file)
processes = [mp.Process(target = fixer, args=(queue,)) for core in range(processors)]
for process in processes:
process.start()
for process in processes:
process.join()
如果您能提供一个示例来使队列版本正常工作,我将不胜感激。在第二个处理步骤中,在写入文件之前,我需要处理器获得中间结果并进行一些计算。这就是我需要队列的原因。
队列脚本中的问题是我没有获取队列中的下一个元素,而是将整个队列传递给修复程序函数。这个问题通过将 queue.get()
的值赋给 fixer 函数中的一个变量来解决:
import os
import pandas as pd
import multiprocessing as mp
def fixer(a_queue):
a_file = a_queue.get()
lines = []
opened_file = open(a_file)
for each_line in opened_file:
lines.append(each_line.replace('mad', 'rational'))
opened_file.close()
df = pd.DataFrame(lines)
#some pandas magics here
df.to_csv(a_file[:-4] + '_fixed.csv')
if __name__ == "__main__":
my_path = os.getcwd()
my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here
processors = mp.cpu_count()
queue = mp.Queue()
for each_file in my_files:
queue.put(each_file)
processes = [mp.Process(target = fixer, args=(queue,)) for core in range(processors)]
for process in processes:
process.start()
for process in processes:
process.join()
我有很长一系列类似的 cvs 文件(总共 14Gb)。我需要打开每个文件,替换某些字符,并将固定版本写入新文件。我想使用我的多核计算机的处理能力。我尝试使用 mp.Pools 和 mp.Process/mp.Queue。池版本有效,但队列方法会产生此错误:
IOError: [Errno 22] invalid mode ('r') or filename: '<multiprocessing.queues.Queue object at 0x0000000002775A90>'
这是我的 Pool 代码的简化版本:
import os
import pandas as pd
import multiprocessing as mp
def fixer(a_file):
lines = []
opened_file = open(a_file)
for each_line in opened_file:
lines.append(each_line.replace('mad', 'rational'))
opened_file.close()
df = pd.DataFrame(lines)
#some pandas magics here
df.to_csv(a_file[:-4] + '_fixed.csv')
if __name__ == "__main__":
my_path = os.getcwd()
my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here
processors = mp.cpu_count()
pool = mp.Pool(processes = processors) # I set as many processes as processors my computer has.
pool.map(fixer, my_files)
这是队列方法:
import os
import pandas as pd
import multiprocessing as mp
def fixer(a_file):
lines = []
opened_file = open(a_file)
for each_line in opened_file:
lines.append(each_line.replace('mad', 'rational'))
opened_file.close()
df = pd.DataFrame(lines)
#some pandas magics here
df.to_csv(a_file[:-4] + '_fixed.csv')
if __name__ == "__main__":
my_path = os.getcwd()
my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here
processors = mp.cpu_count()
queue = mp.Queue()
for each_file in my_files:
queue.put(each_file)
processes = [mp.Process(target = fixer, args=(queue,)) for core in range(processors)]
for process in processes:
process.start()
for process in processes:
process.join()
如果您能提供一个示例来使队列版本正常工作,我将不胜感激。在第二个处理步骤中,在写入文件之前,我需要处理器获得中间结果并进行一些计算。这就是我需要队列的原因。
队列脚本中的问题是我没有获取队列中的下一个元素,而是将整个队列传递给修复程序函数。这个问题通过将 queue.get()
的值赋给 fixer 函数中的一个变量来解决:
import os
import pandas as pd
import multiprocessing as mp
def fixer(a_queue):
a_file = a_queue.get()
lines = []
opened_file = open(a_file)
for each_line in opened_file:
lines.append(each_line.replace('mad', 'rational'))
opened_file.close()
df = pd.DataFrame(lines)
#some pandas magics here
df.to_csv(a_file[:-4] + '_fixed.csv')
if __name__ == "__main__":
my_path = os.getcwd()
my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here
processors = mp.cpu_count()
queue = mp.Queue()
for each_file in my_files:
queue.put(each_file)
processes = [mp.Process(target = fixer, args=(queue,)) for core in range(processors)]
for process in processes:
process.start()
for process in processes:
process.join()