`multiprocessing.Pool.map()` 似乎安排错了
`multiprocessing.Pool.map()` seems to schedule wrongly
我有一个请求服务器的功能,检索一些数据,处理它并保存一个 csv 文件。此功能应启动 20k 次。每次执行的持续时间都不同:有时持续超过 20 分钟,而其他则不到一秒。我决定使用 multiprocessing.Pool.map
来并行执行。我的代码如下所示:
def get_data_and_process_it(filename):
print('getting', filename)
...
print(filename, 'has been process')
with Pool(8) as p:
p.map(get_data_and_process_it, long_list_of_filenames)
看看 prints
是如何生成的,似乎 long_list_of_filenames
它已被分成 8 个部分并分配给每个 CPU
因为有时只是在 20 分钟的执行中被阻塞在那 20 分钟内没有处理 long_list_of_filenames
的其他元素。我所期望的是 map
以 FIFO 样式安排 cpu 核心中的每个元素。
我的情况有更好的方法吗?
map
是阻塞,您可以使用 p.map_async
而不是 p.map
。 map
将等待所有这些函数调用完成,以便我们连续看到所有结果。 map_async
以随机顺序执行工作,不等待正在进行的任务完成后再开始新任务。这是最快的方法。(For more) There is also a 详细讨论了 map
和 map_async
。
多处理池class 为我们处理排队逻辑。它非常适合 运行 并行(示例)网络抓取作业或任何可以独立分解和分发的作业。如果您需要对队列进行更多控制或需要在多个进程之间共享数据,您可能需要查看 Queue
class(For more).
map
方法在 所有 操作完成后仅 returns。
并且从池工作人员打印并不理想。一方面,像 stdout
这样的文件使用缓冲,因此在打印一条消息和它实际出现之间可能存在可变的时间量。此外,由于所有 worker 继承相同的 stdout
,他们的输出将变得相互交织,甚至可能出现乱码。
所以我建议改用 imap_unordered
。 return 是一个迭代器,一旦结果可用就会开始产生结果。唯一的问题是,这个 returns 结果的顺序是它们 完成的顺序 ,而不是它们开始的顺序。
您的辅助函数 (get_data_and_process_it
) 应该 return 某种状态指示器。例如文件名和结果的元组。
def get_data_and_process_it(filename):
...
if (error):
return (filename, f'has *failed* bacause of {reason}')
return (filename, 'has been processed')
然后你可以这样做:
with Pool(8) as p:
for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
print(fn, res)
这提供了有关作业何时完成的准确信息,并且由于只有父进程写入 stdout
,所以输出不会出现乱码。
此外,我建议在程序开头的某处使用 sys.stdout.reconfigure(line_buffering=True)
。这确保了 stdout
流将在每一行输出后被刷新。
我有一个请求服务器的功能,检索一些数据,处理它并保存一个 csv 文件。此功能应启动 20k 次。每次执行的持续时间都不同:有时持续超过 20 分钟,而其他则不到一秒。我决定使用 multiprocessing.Pool.map
来并行执行。我的代码如下所示:
def get_data_and_process_it(filename):
print('getting', filename)
...
print(filename, 'has been process')
with Pool(8) as p:
p.map(get_data_and_process_it, long_list_of_filenames)
看看 prints
是如何生成的,似乎 long_list_of_filenames
它已被分成 8 个部分并分配给每个 CPU
因为有时只是在 20 分钟的执行中被阻塞在那 20 分钟内没有处理 long_list_of_filenames
的其他元素。我所期望的是 map
以 FIFO 样式安排 cpu 核心中的每个元素。
我的情况有更好的方法吗?
map
是阻塞,您可以使用 p.map_async
而不是 p.map
。 map
将等待所有这些函数调用完成,以便我们连续看到所有结果。 map_async
以随机顺序执行工作,不等待正在进行的任务完成后再开始新任务。这是最快的方法。(For more) There is also a map
和 map_async
。
多处理池class 为我们处理排队逻辑。它非常适合 运行 并行(示例)网络抓取作业或任何可以独立分解和分发的作业。如果您需要对队列进行更多控制或需要在多个进程之间共享数据,您可能需要查看 Queue
class(For more).
map
方法在 所有 操作完成后仅 returns。
并且从池工作人员打印并不理想。一方面,像 stdout
这样的文件使用缓冲,因此在打印一条消息和它实际出现之间可能存在可变的时间量。此外,由于所有 worker 继承相同的 stdout
,他们的输出将变得相互交织,甚至可能出现乱码。
所以我建议改用 imap_unordered
。 return 是一个迭代器,一旦结果可用就会开始产生结果。唯一的问题是,这个 returns 结果的顺序是它们 完成的顺序 ,而不是它们开始的顺序。
您的辅助函数 (get_data_and_process_it
) 应该 return 某种状态指示器。例如文件名和结果的元组。
def get_data_and_process_it(filename):
...
if (error):
return (filename, f'has *failed* bacause of {reason}')
return (filename, 'has been processed')
然后你可以这样做:
with Pool(8) as p:
for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
print(fn, res)
这提供了有关作业何时完成的准确信息,并且由于只有父进程写入 stdout
,所以输出不会出现乱码。
此外,我建议在程序开头的某处使用 sys.stdout.reconfigure(line_buffering=True)
。这确保了 stdout
流将在每一行输出后被刷新。