Python 多处理:每 k 次迭代写入文件

Python Multiprocessing: Writing to file every k iterations

我正在使用 python 3.7 中的 multiprocessing 模块重复并行调用一个函数。我想每 k 次迭代将结果写入一个文件。 (每次可以是不同的文件。)

下面是我的第一次尝试,它基本上循环遍历函数参数集,运行并行处理每个集并将结果写入文件,然后再进入下一个集。这显然是非常低效的。实际上,我的函数 运行 花费的时间要长得多,并且会根据输入值而变化,因此许多处理器在循环迭代之间处于空闲状态。

有没有更有效的方法来实现这个?

import multiprocessing as mp
import numpy as np
import pandas as pd

def myfunction(x): # toy example function
  return(x**2)

for start in np.arange(0,500,100):
    
    with mp.Pool(mp.cpu_count()) as pool:
        out = pool.map(myfunction, np.arange(start, start+100))
    
    pd.DataFrame(out).to_csv('filename_'+str(start//100+1)+'.csv', header=False, index=False) 

我的第一条评论是,如果 myfunction 像您展示的那样微不足道,那么使用多处理时您的性能会更差,因为创建进程池会产生开销(顺便说一下,您在每个循环迭代中不必要地一遍又一遍地创建)并将参数从一个进程传递到另一个进程。

假设 myfunction 是纯 CPU 并且在 map 返回 100 个值后,有机会重叠您未利用的 CSV 文件的写入(目前尚不清楚并发磁盘写入会提高多少性能;这取决于您拥有的驱动器类型、磁头移动等),那么多线程和多处理的组合可能是解决方案。假定 myfunction 为 100% CPU 且不释放全局解释器锁,因此无法占用,您的处理池中的进程数将限制为 CPU 核数利用池大小大于您拥有的 CPU 数量的优势。无论如何,这是我的假设。例如,如果您要使用某些 numpy 函数,那么这是一个错误的假设。另一方面,众所周知 numpy 对其自身的某些处理使用多处理,在这种情况下,结合使用 numpy 和您自己的多处理可能会导致性能下降。您当前的代码仅使用 numpy 来生成范围。这似乎有点矫枉过正,因为还有其他生成范围的方法。我通过定义 STARTSTOP 值和 N_SPLITS 来以稍微不同的方式生成范围,该范围的相等(或尽可能相等)划分数并生成可以转换为范围的起始值和终止值的元组。我希望这不会太混乱。但这似乎是一种更灵活的方法。

在下面的代码中创建了一个线程池和一个处理池。任务被提交到线程池,其中一个参数是处理池,worker 使用 whish 进行 CPU 密集计算,然后当结果被组装时,worker 写出 CSV 文件。

from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import cpu_count
import pandas as pd

def worker(process_pool, index, split_range):
    out = process_pool.map(myfunction, range(*split_range))
    pd.DataFrame(out).to_csv(f'filename_{index}.csv', header=False, index=False)

def myfunction(x): # toy example function
  return(x ** 2)

def split(start, stop, n):
    k, m = divmod(stop - start, n)
    return [(i * k + min(i, m),(i + 1) * k + min(i + 1, m)) for i in range(n)]

def main():
    RANGE_START = 0
    RANGE_STOP = 500
    N_SPLITS = 5
    n_processes = min(N_SPLITS, cpu_count())
    split_ranges = split(RANGE_START, RANGE_STOP, N_SPLITS) # [(0, 100), (100, 200), ... (400, 500)]
    process_pool = Pool(n_processes)
    thread_pool = ThreadPool(N_SPLITS)
    for index, split_range in enumerate(split_ranges):
        thread_pool.apply_async(worker, args=(process_pool, index, split_range))
    # wait for all threading tasks to complete:
    thread_pool.close()
    thread_pool.join()

# required for Windows:
if __name__ == '__main__':
    main()