对多个文件使用多重处理的最佳方式
Optimal way to use multiprocessing for many files
所以我有一大堆文件需要处理成 CSV。每个文件本身都很大,每一行都是一个字符串。文件的每一行都可以代表三种数据类型中的一种,每种数据的处理方式都略有不同。我当前的解决方案如下所示:
type1_columns = [...]
type2_columns = [...]
type3_columns = [...]
file_list = os.listdir(filelist)
def process_type1_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type1_series = pd.Series(to_append, index=type1_columns)
return type1_series
def process_type2_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type2_series = pd.Series(to_append, index=type2_columns)
return type2_series
def process_type3_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type3_series = pd.Series(to_append, index=type3_columns)
return type3_series
def process_file(file):
type1_df = pd.DataFrame(columns=type1_columns)
type2_df = pd.DataFrame(columns=type2_columns)
type3_df = pd.DataFrame(columns=type3_columns)
with open(filepath/file) as f:
data=f.readlines()
for line in data:
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
type1_df = type1_df.append(type1_series, ignore_index=True)
if record_type == "type2":
type2_series = process_type2_line(json_line)
type2_df = type2_df.append(type2_series, ignore_index=True)
if record_type == "type3":
type3_series = process_type3_line(json_line)
type3_df = type3_df.append(type3_series, ignore_index=True)
type1_df.to_csv(type1_csv_path.csv)
type2_df.to_csv(type2_csv_path.csv)
type3_df.to_csv(type3_csv_path.csv)
for file in file_list:
process_file(file)
我遍历文件,并为三种不同类型的记录中的每一种创建数据帧。我解析这些行并为每个行调用适当的处理函数。返回的系列附加到该文件的 record_type 的最终数据帧。处理文件后,三个数据帧将保存为 CSV,我们从下一个文件开始。
问题是这种方法花费的时间太长,我需要几周的时间来处理所有文件。
我尝试通过以下方式使用多处理(我没有太多经验)来修改我的方法:
with ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(process_file, file) for file in file_list]
在一些日志记录打印语句中,我可以看到这开始了对 30 个文件的处理,但是 none 已经完成,所以我至少知道我的方法是有缺陷的。谁能解释一下解决这个问题的最佳方法是什么?也许是多处理和异步的某种组合?
你有两个大问题:
您将整个输入文件加载到内存中,在内存中产生整个结果,然后一次写入整个输出文件。这意味着,如果您有 30 个并行操作的工作人员,则需要与 30 个 (self-described) large 文件成比例的内存。您也将所有数据存储两次,一次是 f.readlines()
返回的 str
行中的 list
,然后是三个 DataFrame
中的一个;如果您按原样使用没有执行程序的代码,并且只是更改了:
data=f.readlines()
for line in data:
至:
for line in f:
您会立即将内存使用量减少大约一半,这(可能)足以阻止页面抖动。也就是说,您仍然会使用与文件大小成比例的内存来存储 DataFrame
,因此如果您将代码并行化,您将恢复抖动,如果文件足够大,即使没有并行性也可能会抖动。
你对 每一行 使用 .append
,IIRC,对于 DataFrame
s 是 Schlemiel 的一种形式Painter的算法:每个append
做一个brand-newDataFrame
,将旧的DataFrame
的全部内容加上少量的新数据复制到一个新的DataFrame
,随着现有数据越来越大,工作时间越来越长;应该摊销的 O(n)
工作变成 O(n**2)
工作。
在这两者之间,您使用的内存 多于所需, 和 执行 吨 重复附加的不必要的繁忙工作。并行性可能有助于更快地完成繁重的工作,但作为交换,它会使您的内存需求增加 30 倍;很可能,您没有那么多 RAM(如果这些文件真的很大,您可能没有足够的 RAM 甚至一个文件),并且您最终会出现页面抖动(将内存写入 pagefile/swap文件为其他东西腾出空间,按需读回,并经常丢弃在你用完它之前分页的内存,使内存访问与磁盘性能相关,这比 RAM 访问慢几个数量级) .
我对 Pandas 的了解还不足以说明它是否为您正在做的事情提供了一些更好的、渐进的解决方案,但您并不真的需要一个;只需逐行处理输入,并使用 csv
模块随时写入行。您的内存需求将从“与每个输入文件的大小成比例”下降到“与输入文件的每个 行 的数据成比例”。
你的 process_file
函数最终看起来像这样:
def process_file(file):
# Open input file and all output files (newline='' needed to play nice with csv module
# which takes control of newline format to ensure dialect rules followed precisely,
# regardless of OS line separator rules)
with open(filepath/file) as f,\
open(type1_csv_path, 'w', newline='') as type1f,\
open(type2_csv_path, 'w', newline='') as type2f,\
open(type3_csv_path, 'w', newline='') as type3f:
csv1 = csv.writer(type1f)
csv1.writerow(type1_columns) # Omit if no useful column header
csv2 = csv.writer(type2f)
csv2.writerow(type2_columns) # Omit if no useful column header
csv3 = csv.writer(type3f)
csv3.writerow(type3_columns) # Omit if no useful column header
for line in f: # Directly iterating file object lazily fetches line at a time
# where .readlines() eagerly fetches whole file, requiring
# a ton of memory for no reason
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
csv1.writerow(type1_series) # Might need to convert to plain list if Series
# doesn't iterate the values you need directly
elif record_type == "type2":
type2_series = process_type2_line(json_line)
csv2.writerow(type2_series)
elif record_type == "type3":
type3_series = process_type3_line(json_line)
csv3.writerow(type3_series)
如果它按原样工作(没有执行程序),就那样使用它。如果你在没有执行者的情况下页面抖动,或者文件足够大以至于重复的 append
s 严重伤害你,这可能足以让它自己工作。如果它太慢,执行器 可能 提供一点好处,如果你正在做 很多 的工作来将每一行处理成输出格式(因为虽然大多数工作人员都在处理,但一两个工作人员可以充分共享磁盘访问以进行读取和写入),但是如果每行的处理工作很低,那么少数工作人员(我会从两三个开始)会只会增加磁盘争用(特别是如果您使用的是旋转磁盘硬盘驱动器,而不是 SSD),并行性要么无济于事,要么会造成积极伤害。
您可能需要调整使用的确切 CSV 方言(作为参数传递给 csv.writer
),并可能为输出文件显式设置特定的 encoding
而不是语言环境默认值(例如传递encoding='utf-8'
或 encoding='utf-16'
到 open
s 进行写入,因此它总是以 .csv
文件的消费者期望的编码写入),但这是一般形式去争取。
所以我有一大堆文件需要处理成 CSV。每个文件本身都很大,每一行都是一个字符串。文件的每一行都可以代表三种数据类型中的一种,每种数据的处理方式都略有不同。我当前的解决方案如下所示:
type1_columns = [...]
type2_columns = [...]
type3_columns = [...]
file_list = os.listdir(filelist)
def process_type1_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type1_series = pd.Series(to_append, index=type1_columns)
return type1_series
def process_type2_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type2_series = pd.Series(to_append, index=type2_columns)
return type2_series
def process_type3_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type3_series = pd.Series(to_append, index=type3_columns)
return type3_series
def process_file(file):
type1_df = pd.DataFrame(columns=type1_columns)
type2_df = pd.DataFrame(columns=type2_columns)
type3_df = pd.DataFrame(columns=type3_columns)
with open(filepath/file) as f:
data=f.readlines()
for line in data:
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
type1_df = type1_df.append(type1_series, ignore_index=True)
if record_type == "type2":
type2_series = process_type2_line(json_line)
type2_df = type2_df.append(type2_series, ignore_index=True)
if record_type == "type3":
type3_series = process_type3_line(json_line)
type3_df = type3_df.append(type3_series, ignore_index=True)
type1_df.to_csv(type1_csv_path.csv)
type2_df.to_csv(type2_csv_path.csv)
type3_df.to_csv(type3_csv_path.csv)
for file in file_list:
process_file(file)
我遍历文件,并为三种不同类型的记录中的每一种创建数据帧。我解析这些行并为每个行调用适当的处理函数。返回的系列附加到该文件的 record_type 的最终数据帧。处理文件后,三个数据帧将保存为 CSV,我们从下一个文件开始。
问题是这种方法花费的时间太长,我需要几周的时间来处理所有文件。
我尝试通过以下方式使用多处理(我没有太多经验)来修改我的方法:
with ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(process_file, file) for file in file_list]
在一些日志记录打印语句中,我可以看到这开始了对 30 个文件的处理,但是 none 已经完成,所以我至少知道我的方法是有缺陷的。谁能解释一下解决这个问题的最佳方法是什么?也许是多处理和异步的某种组合?
你有两个大问题:
您将整个输入文件加载到内存中,在内存中产生整个结果,然后一次写入整个输出文件。这意味着,如果您有 30 个并行操作的工作人员,则需要与 30 个 (self-described) large 文件成比例的内存。您也将所有数据存储两次,一次是
f.readlines()
返回的str
行中的list
,然后是三个DataFrame
中的一个;如果您按原样使用没有执行程序的代码,并且只是更改了:data=f.readlines() for line in data:
至:
for line in f:
您会立即将内存使用量减少大约一半,这(可能)足以阻止页面抖动。也就是说,您仍然会使用与文件大小成比例的内存来存储
DataFrame
,因此如果您将代码并行化,您将恢复抖动,如果文件足够大,即使没有并行性也可能会抖动。你对 每一行 使用
.append
,IIRC,对于DataFrame
s 是 Schlemiel 的一种形式Painter的算法:每个append
做一个brand-newDataFrame
,将旧的DataFrame
的全部内容加上少量的新数据复制到一个新的DataFrame
,随着现有数据越来越大,工作时间越来越长;应该摊销的O(n)
工作变成O(n**2)
工作。
在这两者之间,您使用的内存 多于所需, 和 执行 吨 重复附加的不必要的繁忙工作。并行性可能有助于更快地完成繁重的工作,但作为交换,它会使您的内存需求增加 30 倍;很可能,您没有那么多 RAM(如果这些文件真的很大,您可能没有足够的 RAM 甚至一个文件),并且您最终会出现页面抖动(将内存写入 pagefile/swap文件为其他东西腾出空间,按需读回,并经常丢弃在你用完它之前分页的内存,使内存访问与磁盘性能相关,这比 RAM 访问慢几个数量级) .
我对 Pandas 的了解还不足以说明它是否为您正在做的事情提供了一些更好的、渐进的解决方案,但您并不真的需要一个;只需逐行处理输入,并使用 csv
模块随时写入行。您的内存需求将从“与每个输入文件的大小成比例”下降到“与输入文件的每个 行 的数据成比例”。
你的 process_file
函数最终看起来像这样:
def process_file(file):
# Open input file and all output files (newline='' needed to play nice with csv module
# which takes control of newline format to ensure dialect rules followed precisely,
# regardless of OS line separator rules)
with open(filepath/file) as f,\
open(type1_csv_path, 'w', newline='') as type1f,\
open(type2_csv_path, 'w', newline='') as type2f,\
open(type3_csv_path, 'w', newline='') as type3f:
csv1 = csv.writer(type1f)
csv1.writerow(type1_columns) # Omit if no useful column header
csv2 = csv.writer(type2f)
csv2.writerow(type2_columns) # Omit if no useful column header
csv3 = csv.writer(type3f)
csv3.writerow(type3_columns) # Omit if no useful column header
for line in f: # Directly iterating file object lazily fetches line at a time
# where .readlines() eagerly fetches whole file, requiring
# a ton of memory for no reason
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
csv1.writerow(type1_series) # Might need to convert to plain list if Series
# doesn't iterate the values you need directly
elif record_type == "type2":
type2_series = process_type2_line(json_line)
csv2.writerow(type2_series)
elif record_type == "type3":
type3_series = process_type3_line(json_line)
csv3.writerow(type3_series)
如果它按原样工作(没有执行程序),就那样使用它。如果你在没有执行者的情况下页面抖动,或者文件足够大以至于重复的 append
s 严重伤害你,这可能足以让它自己工作。如果它太慢,执行器 可能 提供一点好处,如果你正在做 很多 的工作来将每一行处理成输出格式(因为虽然大多数工作人员都在处理,但一两个工作人员可以充分共享磁盘访问以进行读取和写入),但是如果每行的处理工作很低,那么少数工作人员(我会从两三个开始)会只会增加磁盘争用(特别是如果您使用的是旋转磁盘硬盘驱动器,而不是 SSD),并行性要么无济于事,要么会造成积极伤害。
您可能需要调整使用的确切 CSV 方言(作为参数传递给 csv.writer
),并可能为输出文件显式设置特定的 encoding
而不是语言环境默认值(例如传递encoding='utf-8'
或 encoding='utf-16'
到 open
s 进行写入,因此它总是以 .csv
文件的消费者期望的编码写入),但这是一般形式去争取。