`multiprocessing.Pool.map()` 似乎安排错了

`multiprocessing.Pool.map()` seems to schedule wrongly

我有一个请求服务器的功能,检索一些数据,处理它并保存一个 csv 文件。此功能应启动 20k 次。每次执行的持续时间都不同:有时持续超过 20 分钟,而其他则不到一秒。我决定使用 multiprocessing.Pool.map 来并行执行。我的代码如下所示:

def get_data_and_process_it(filename):
    print('getting', filename)
    ...
    print(filename, 'has been process')

with Pool(8) as p:
    p.map(get_data_and_process_it, long_list_of_filenames)

看看 prints 是如何生成的,似乎 long_list_of_filenames 它已被分成 8 个部分并分配给每个 CPU 因为有时只是在 20 分钟的执行中被阻塞在那 20 分钟内没有处理 long_list_of_filenames 的其他元素。我所期望的是 map 以 FIFO 样式安排 cpu 核心中的每个元素。

我的情况有更好的方法吗?

map 是阻塞,您可以使用 p.map_async 而不是 p.mapmap 将等待所有这些函数调用完成,以便我们连续看到所有结果。 map_async 以随机顺序执行工作,不等待正在进行的任务完成后再开始新任务。这是最快的方法。(For more) There is also a 详细讨论了 mapmap_async

多处理池class 为我们处理排队逻辑。它非常适合 运行 并行(示例)网络抓取作业或任何可以独立分解和分发的作业。如果您需要对队列进行更多控制或需要在多个进程之间共享数据,您可能需要查看 Queue class(For more).

map 方法在 所有 操作完成后仅 returns。

并且从池工作人员打印并不理想。一方面,像 stdout 这样的文件使用缓冲,因此在打印一条消息和它实际出现之间可能存在可变的时间量。此外,由于所有 worker 继承相同的 stdout,他们的输出将变得相互交织,甚至可能出现乱码。

所以我建议改用 imap_unordered。 return 是一个迭代器,一旦结果可用就会开始产生结果。唯一的问题是,这个 returns 结果的顺序是它们 完成的顺序 ,而不是它们开始的顺序。

您的辅助函数 (get_data_and_process_it) 应该 return 某种状态指示器。例如文件名和结果的元组。

def get_data_and_process_it(filename):
    ...
    if (error):
        return (filename, f'has *failed* bacause of {reason}')
    return (filename, 'has been processed')

然后你可以这样做:

with Pool(8) as p:
   for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
       print(fn, res)

这提供了有关作业何时完成的准确信息,并且由于只有父进程写入 stdout,所以输出不会出现乱码。

此外,我建议在程序开头的某处使用 sys.stdout.reconfigure(line_buffering=True)。这确保了 stdout 流将在每一行输出后被刷新。