对多个文件使用多重处理的最佳方式

Optimal way to use multiprocessing for many files

所以我有一大堆文件需要处理成 CSV。每个文件本身都很大,每一行都是一个字符串。文件的每一行都可以代表三种数据类型中的一种,每种数据的处理方式都略有不同。我当前的解决方案如下所示:

    type1_columns = [...]
    type2_columns = [...]
    type3_columns = [...]

    file_list = os.listdir(filelist)

    def process_type1_line(json_line):
       #processing logic
       to_append = [a, b, c, d, e]
       type1_series = pd.Series(to_append, index=type1_columns)
       return type1_series


    def process_type2_line(json_line):
       #processing logic
       to_append = [a, b, c, d, e]
       type2_series = pd.Series(to_append, index=type2_columns)
       return type2_series


    def process_type3_line(json_line):
       #processing logic
       to_append = [a, b, c, d, e]
       type3_series = pd.Series(to_append, index=type3_columns)
       return type3_series


    def process_file(file):
        type1_df = pd.DataFrame(columns=type1_columns)
        type2_df = pd.DataFrame(columns=type2_columns)
        type3_df = pd.DataFrame(columns=type3_columns)

        with open(filepath/file) as f:
             data=f.readlines()
             for line in data:
                  #some logic to get the record_type and convert line to json
                  record_type = ...
                  json_line = ...

                  if record_type == "type1":
                       type1_series = process_type1_line(json_line)
                       type1_df = type1_df.append(type1_series, ignore_index=True)
                  if record_type == "type2":
                       type2_series = process_type2_line(json_line)
                       type2_df = type2_df.append(type2_series, ignore_index=True)
                  if record_type == "type3":
                       type3_series = process_type3_line(json_line)
                       type3_df = type3_df.append(type3_series, ignore_index=True)

        type1_df.to_csv(type1_csv_path.csv)
        type2_df.to_csv(type2_csv_path.csv)
        type3_df.to_csv(type3_csv_path.csv)


     for file in file_list:
          process_file(file)

我遍历文件,并为三种不同类型的记录中的每一种创建数据帧。我解析这些行并为每个行调用适当的处理函数。返回的系列附加到该文件的 record_type 的最终数据帧。处理文件后,三个数据帧将保存为 CSV,我们从下一个文件开始。

问题是这种方法花费的时间太长,我需要几周的时间来处理所有文件。

我尝试通过以下方式使用多处理(我没有太多经验)来修改我的方法:

     with ThreadPoolExecutor(max_workers=30) as executor:
          futures = [executor.submit(process_file, file) for file in file_list]

在一些日志记录打印语句中,我可以看到这开始了对 30 个文件的处理,但是 none 已经完成,所以我至少知道我的方法是有缺陷的。谁能解释一下解决这个问题的最佳方法是什么?也许是多处理和异步的某种组合?

你有两个大问题:

  1. 您将整个输入文件加载到内存中,在内存中产生整个结果,然后一次写入整个输出文件。这意味着,如果您有 30 个并行操作的工作人员,则需要与 30 个 (self-described) large 文件成比例的内存。您也将所有数据存储两次,一次是 f.readlines() 返回的 str 行中的 list,然后是三个 DataFrame 中的一个;如果您按原样使用没有执行程序的代码,并且只是更改了:

          data=f.readlines()
          for line in data:
    

    至:

          for line in f:
    

    您会立即将内存使用量减少大约一半,这(可能)足以阻止页面抖动。也就是说,您仍然会使用与文件大小成比例的内存来存储 DataFrame,因此如果您将代码并行化,您将恢复抖动,如果文件足够大,即使没有并行性也可能会抖动。

  2. 你对 每一行 使用 .append,IIRC,对于 DataFrames 是 Schlemiel 的一种形式Painter的算法:每个append做一个brand-newDataFrame,将旧的DataFrame的全部内容加上少量的新数据复制到一个新的DataFrame,随着现有数据越来越大,工作时间越来越长;应该摊销的 O(n) 工作变成 O(n**2) 工作。

在这两者之间,您使用的内存 多于所需, 执行 重复附加的不必要的繁忙工作。并行性可能有助于更快地完成繁重的工作,但作为交换,它会使您的内存需求增加 30 倍;很可能,您没有那么多 RAM(如果这些文件真的很大,您可能没有足够的 RAM 甚至一个文件),并且您最终会出现页面抖动(将内存写入 pagefile/swap文件为其他东西腾出空间,按需读回,并经常丢弃在你用完它之前分页的内存,使内存访问与磁盘性能相关,这比 RAM 访问慢几个数量级) .

我对 Pandas 的了解还不足以说明它是否为您正在做的事情提供了一些更好的、渐进的解决方案,但您并不真的需要一个;只需逐行处理输入,并使用 csv 模块随时写入行。您的内存需求将从“与每个输入文件的大小成比例”下降到“与输入文件的每个 的数据成比例”。

你的 process_file 函数最终看起来像这样:

def process_file(file):
    # Open input file and all output files (newline='' needed to play nice with csv module
    # which takes control of newline format to ensure dialect rules followed precisely,
    # regardless of OS line separator rules)
    with open(filepath/file) as f,\
         open(type1_csv_path, 'w', newline='') as type1f,\
         open(type2_csv_path, 'w', newline='') as type2f,\
         open(type3_csv_path, 'w', newline='') as type3f:
         csv1 = csv.writer(type1f)
         csv1.writerow(type1_columns)  # Omit if no useful column header
         csv2 = csv.writer(type2f)
         csv2.writerow(type2_columns)  # Omit if no useful column header
         csv3 = csv.writer(type3f)
         csv3.writerow(type3_columns)  # Omit if no useful column header
         for line in f:  # Directly iterating file object lazily fetches line at a time
                         # where .readlines() eagerly fetches whole file, requiring
                         # a ton of memory for no reason
              #some logic to get the record_type and convert line to json
              record_type = ...
              json_line = ...

              if record_type == "type1":
                   type1_series = process_type1_line(json_line)
                   csv1.writerow(type1_series)  # Might need to convert to plain list if Series
                                                # doesn't iterate the values you need directly
              elif record_type == "type2":
                   type2_series = process_type2_line(json_line)
                   csv2.writerow(type2_series)
              elif record_type == "type3":
                   type3_series = process_type3_line(json_line)
                   csv3.writerow(type3_series)

如果它按原样工作(没有执行程序),就那样使用它。如果你在没有执行者的情况下页面抖动,或者文件足够大以至于重复的 appends 严重伤害你,这可能足以让它自己工作。如果它太慢,执行器 可能 提供一点好处,如果你正在做 很多 的工作来将每一行处理成输出格式(因为虽然大多数工作人员都在处理,但一两个工作人员可以充分共享磁盘访问以进行读取和写入),但是如果每行的处理工作很低,那么少数工作人员(我会从两三个开始)会只会增加磁盘争用(特别是如果您使用的是旋转磁盘硬盘驱动器,而不是 SSD),并行性要么无济于事,要么会造成积极伤害。

您可能需要调整使用的确切 CSV 方言(作为参数传递给 csv.writer),并可能为输出文件显式设置特定的 encoding 而不是语言环境默认值(例如传递encoding='utf-8'encoding='utf-16'opens 进行写入,因此它总是以 .csv 文件的消费者期望的编码写入),但这是一般形式去争取。