Python multitprocessing 处理文件

Python multitprocessing to process files

我以前从未对多处理做过任何事情,但我最近 运行 遇到了一个问题,我的一个项目花费了过多的时间 运行。我有大约 336,000 个文件需要处理,传统的 for 循环可能需要大约一周的时间才能 运行.

有两个循环可以执行此操作,但它们实际上是相同的return,所以我只包含一个。

import json
import os
from tqdm import tqdm
import multiprocessing as mp

jsons = os.listdir('/content/drive/My Drive/mrp_workflow/JSONs')

materials = [None] * len(jsons)

def asyncJSONs(file, index):
  try:
    with open('/content/drive/My Drive/mrp_workflow/JSONs/{}'.format(file)) as f:
        data = json.loads(f.read())
    properties = process_dict(data, {})
    properties['name'] = file.split('.')[0]
    materials[index] = properties
  except:
    print("Error parsing at {}".format(file))

process_list = []
i = 0
for file in tqdm(jsons):
    p = mp.Process(target=asyncJSONs,args=(file,i))
    p.start()
    process_list.append(p)
    i += 1

for process in process_list:
  process.join()

与多处理相关的所有内容都是从 google 搜索和文章的集合中拼凑而成的,所以如果它不完全正确我也不会感到惊讶。例如,'i' 变量是一种以某种顺序保持信息的肮脏尝试。

我想做的是从那些 JSON 文件中加载信息并将其存储在材料变量中。但是当我 运行 我当前的代码没有存储在材料中。

您需要了解多处理的工作原理。它为每项任务启动了一个全新的流程,每个任务都有一个全新的 Python 解释器,运行 重新编写您的脚本。这些进程不以任何方式共享内存。其他进程获得了你的全局变量的副本,但它们显然不能是相同的内存。

如果需要发回信息,可以使用multiprocessing.queue。让函数将结果填充到队列中,而您的主代码等待内容神奇地出现在队列中。

另请阅读 multiprocessing 文档中关于 main 的说明。每个新进程都会重新执行主文件中的所有代码。因此,任何一次性的东西绝对必须包含在 a

if __name__ == "__main__":

阻止。在这种情况下,将主线代码放入名为 main() 的函数中的做法是“最佳做法”。


一直在这里做什么?是在读文件吗?如果是这样,那么您可以使用多线程而不是多处理来做到这一点。但是,如果您受到磁盘速度的限制,那么再多的多处理也不会减少您的 运行 时间。

将提及解决此问题的一种完全不同的方法。不要费心尝试将所有数据附加到同一个列表。提取您需要的数据,并以 ndjson/jsonlines 格式将其附加到某个目标文件。就是这样,而不是 json 数组 [{},{}...] 的对象部分,每一行都有单独的对象。

{"foo": "bar"} 
{"foo": "spam"} 
{"eggs": "jam"} 

工作流程如下所示:

  1. 使用要处理的文件清单和要写入的输出文件生成 N 个工人。你甚至不需要 MP,你可以使用像 rush 这样的工具来并行化。
  2. worker解析数据,生成输出字典
  3. worker 打开带有附加标志的输出文件。转储数据并立即刷新:
with open(out_file, 'a') as fp: 
  print(json.dumps(data), file=fp, flush=True) 

Flush 确保只要你的数据小于你内核的缓冲区大小(通常是几MB),你的不同进程就不会互相踩踏和写入冲突。如果它们确实发生冲突,您可能需要为每个 worker 写入一个单独的输出文件,然后将它们全部加入。

如果需要,您可以使用 jq 将文件 and/or 转换为常规 JSON 数组。老实说,拥抱 json 行。对于长对象列表,这是一种更好的数据格式,因为您不必在内存中解析整个对象。

正如您在其他答案中看到的那样 - 进程不共享内存,您不能直接在 materials 中设置值。函数必须使用 return 将结果发送回主进程,并且必须等待结果并获取它。

Pool可以更简单。它不需要手动使用 queue。它应该 return 结果与 all_jsons 中数据的顺序相同。并且你可以同时设置多少个进程运行这样它就不会阻塞CPU系统中的其他进程。

但是不能用tqdm.

我无法测试它,但它可以是这样的

import os
import json
from multiprocessing import Pool

# --- functions ---

def asyncJSONs(filename):
  try:
    fullpath = os.path.join(folder, filename)
    with open(fullpath) as f:
        data = json.loads(f.read())
    properties = process_dict(data, {})
    properties['name'] = filename.split('.')[0]
    return properties
  except:
    print("Error parsing at {}".format(filename))

# --- main ---

# for all processes (on some systems it may have to be outside `__main__`)
folder = '/content/drive/My Drive/mrp_workflow/JSONs'

if __name__ == '__main__':
    # code only for main process
    
    all_jsons = os.listdir(folder)

    with Pool(5) as p:
        materials = p.map(asyncJSONs, all_jsons)

    for item in materials:
        print(item)    

顺便说一句:

其他模块:concurrent.futures, joblib, ray,