如何使用 python-multiprocessing 来连接多个 files/dataframes?

How to use python-multiprocessing to concat many files/dataframes?

我对 python 和编程比较陌生,只是用它来分析模拟数据。 我有一个目录“result_1/”,其中包含超过 150000 个带有模拟数据的 CSV 文件,我想将其连接到一个 pandas-dataFrame 中。为了避免 readdir() 一次只读取 32K 目录条目的问题,我准备了“files.csv”——列出目录中的所有文件。

(“sim”、“det”和“运行”是我从文件名中读取的信息片段,并作为 Series 插入到 dataFrame 中。为了更好地忽略,我将它们的定义从连接。)

我的问题如下: 该程序花费了太多时间 运行,我想使用 multiprocessing/-threading 来加速 for-loop,但由于我以前从未使用过 mp/mt,我什至不知道是否或者如何在此处使用它。

提前谢谢你,祝你有美好的一天!

import numpy as np                          
import pandas as pd                         
import os
import multiprocessing as mp

df = pd.DataFrame()
path = 'result_1/'
list = pd.read_csv('files.csv', encoding='utf_16_le', names=['f'])['f'].values.tolist()

for file in list:
    dftemp = pd.read_csv(r'{}'.format(os.path.join(path, file)), skiprows=8, names=['x', 'y', 'z', 'dos'], sep=',').drop(['y', 'z'], axis=1)
    sim = pd.Series(int(file.split('Nr')[1].split('_')[0]) * np.ones((300,), dtype=int))
    det = pd.Series(int(file.split('Nr')[0]) * np.ones((300,), dtype=int))
    run = pd.Series(int(file[-8:-4]) * np.ones((300,), dtype=int))
    dftemp = pd.concat([sim, det, run, dftemp], axis=1)
    df = pd.concat([df, dftemp], axis=0)

df.rename({0:'sim', 1:'det', 2:'run', 3:'x', 4:'dos'}, axis=1).to_csv(r'df.csv')

CSV 文件如下所示:“193Nr6_Run_0038.csv”(f.e.)

#(8 lines of things I don't need.)
0, 0, 0, 4.621046656438921e-09
1, 0, 0, 4.600856584602298e-09
(... 300 lines of data [x, y, z, dose])

由于 CPU 和 RAM 限制,并行处理数据帧可能很困难。我不知道你的硬件规格,也不知道你的 DataFrame 的细节。但是,我会使用多处理来“parse/make”数据帧,然后将它们连接起来。这是一个例子:

import numpy as np                          
import pandas as pd                         
import os
from multiprocessing import Pool


path = 'result_1/'
list_of_files = pd.read_csv('files.csv', encoding='utf_16_le', names=['f'])['f'].values.tolist()

#make a function to replace the for-loop:
def my_custom_func(file):
    dftemp = pd.read_csv(r'{}'.format(os.path.join(path, file)), skiprows=8, names=['x', 'y', 'z', 'dos'], sep=',').drop(['y', 'z'], axis=1)
    sim = pd.Series(int(file.split('Nr')[1].split('_')[0]) * np.ones((300,), dtype=int))
    det = pd.Series(int(file.split('Nr')[0]) * np.ones((300,), dtype=int))
    run = pd.Series(int(file[-8:-4]) * np.ones((300,), dtype=int))
    return pd.concat([sim, det, run, dftemp], axis=1)

#use multiprocessing to process multiple files at once
with Pool(8) as p: #8 processes simultaneously. Avoid using more processes than cores in your CPU
    dataframes = p.map(my_custom_func, list_of_files)

#Finally, concatenate them all
df = pd.concat(dataframes)

df.rename({0:'sim', 1:'det', 2:'run', 3:'x', 4:'dos'}, axis=1).to_csv(r'df.csv')

查看 multiprocessing.Pool() 了解更多信息。