如何使用 python-multiprocessing 来连接多个 files/dataframes?
How to use python-multiprocessing to concat many files/dataframes?
我对 python 和编程比较陌生,只是用它来分析模拟数据。
我有一个目录“result_1/”,其中包含超过 150000 个带有模拟数据的 CSV 文件,我想将其连接到一个 pandas-dataFrame 中。为了避免 readdir() 一次只读取 32K 目录条目的问题,我准备了“files.csv”——列出目录中的所有文件。
(“sim”、“det”和“运行”是我从文件名中读取的信息片段,并作为 Series 插入到 dataFrame 中。为了更好地忽略,我将它们的定义从连接。)
我的问题如下:
该程序花费了太多时间 运行,我想使用 multiprocessing/-threading 来加速 for-loop,但由于我以前从未使用过 mp/mt,我什至不知道是否或者如何在此处使用它。
提前谢谢你,祝你有美好的一天!
import numpy as np
import pandas as pd
import os
import multiprocessing as mp
df = pd.DataFrame()
path = 'result_1/'
list = pd.read_csv('files.csv', encoding='utf_16_le', names=['f'])['f'].values.tolist()
for file in list:
dftemp = pd.read_csv(r'{}'.format(os.path.join(path, file)), skiprows=8, names=['x', 'y', 'z', 'dos'], sep=',').drop(['y', 'z'], axis=1)
sim = pd.Series(int(file.split('Nr')[1].split('_')[0]) * np.ones((300,), dtype=int))
det = pd.Series(int(file.split('Nr')[0]) * np.ones((300,), dtype=int))
run = pd.Series(int(file[-8:-4]) * np.ones((300,), dtype=int))
dftemp = pd.concat([sim, det, run, dftemp], axis=1)
df = pd.concat([df, dftemp], axis=0)
df.rename({0:'sim', 1:'det', 2:'run', 3:'x', 4:'dos'}, axis=1).to_csv(r'df.csv')
CSV 文件如下所示:“193Nr6_Run_0038.csv”(f.e.)
#(8 lines of things I don't need.)
0, 0, 0, 4.621046656438921e-09
1, 0, 0, 4.600856584602298e-09
(... 300 lines of data [x, y, z, dose])
由于 CPU 和 RAM 限制,并行处理数据帧可能很困难。我不知道你的硬件规格,也不知道你的 DataFrame 的细节。但是,我会使用多处理来“parse/make”数据帧,然后将它们连接起来。这是一个例子:
import numpy as np
import pandas as pd
import os
from multiprocessing import Pool
path = 'result_1/'
list_of_files = pd.read_csv('files.csv', encoding='utf_16_le', names=['f'])['f'].values.tolist()
#make a function to replace the for-loop:
def my_custom_func(file):
dftemp = pd.read_csv(r'{}'.format(os.path.join(path, file)), skiprows=8, names=['x', 'y', 'z', 'dos'], sep=',').drop(['y', 'z'], axis=1)
sim = pd.Series(int(file.split('Nr')[1].split('_')[0]) * np.ones((300,), dtype=int))
det = pd.Series(int(file.split('Nr')[0]) * np.ones((300,), dtype=int))
run = pd.Series(int(file[-8:-4]) * np.ones((300,), dtype=int))
return pd.concat([sim, det, run, dftemp], axis=1)
#use multiprocessing to process multiple files at once
with Pool(8) as p: #8 processes simultaneously. Avoid using more processes than cores in your CPU
dataframes = p.map(my_custom_func, list_of_files)
#Finally, concatenate them all
df = pd.concat(dataframes)
df.rename({0:'sim', 1:'det', 2:'run', 3:'x', 4:'dos'}, axis=1).to_csv(r'df.csv')
查看 multiprocessing.Pool() 了解更多信息。
我对 python 和编程比较陌生,只是用它来分析模拟数据。 我有一个目录“result_1/”,其中包含超过 150000 个带有模拟数据的 CSV 文件,我想将其连接到一个 pandas-dataFrame 中。为了避免 readdir() 一次只读取 32K 目录条目的问题,我准备了“files.csv”——列出目录中的所有文件。
(“sim”、“det”和“运行”是我从文件名中读取的信息片段,并作为 Series 插入到 dataFrame 中。为了更好地忽略,我将它们的定义从连接。)
我的问题如下: 该程序花费了太多时间 运行,我想使用 multiprocessing/-threading 来加速 for-loop,但由于我以前从未使用过 mp/mt,我什至不知道是否或者如何在此处使用它。
提前谢谢你,祝你有美好的一天!
import numpy as np
import pandas as pd
import os
import multiprocessing as mp
df = pd.DataFrame()
path = 'result_1/'
list = pd.read_csv('files.csv', encoding='utf_16_le', names=['f'])['f'].values.tolist()
for file in list:
dftemp = pd.read_csv(r'{}'.format(os.path.join(path, file)), skiprows=8, names=['x', 'y', 'z', 'dos'], sep=',').drop(['y', 'z'], axis=1)
sim = pd.Series(int(file.split('Nr')[1].split('_')[0]) * np.ones((300,), dtype=int))
det = pd.Series(int(file.split('Nr')[0]) * np.ones((300,), dtype=int))
run = pd.Series(int(file[-8:-4]) * np.ones((300,), dtype=int))
dftemp = pd.concat([sim, det, run, dftemp], axis=1)
df = pd.concat([df, dftemp], axis=0)
df.rename({0:'sim', 1:'det', 2:'run', 3:'x', 4:'dos'}, axis=1).to_csv(r'df.csv')
CSV 文件如下所示:“193Nr6_Run_0038.csv”(f.e.)
#(8 lines of things I don't need.)
0, 0, 0, 4.621046656438921e-09
1, 0, 0, 4.600856584602298e-09
(... 300 lines of data [x, y, z, dose])
由于 CPU 和 RAM 限制,并行处理数据帧可能很困难。我不知道你的硬件规格,也不知道你的 DataFrame 的细节。但是,我会使用多处理来“parse/make”数据帧,然后将它们连接起来。这是一个例子:
import numpy as np
import pandas as pd
import os
from multiprocessing import Pool
path = 'result_1/'
list_of_files = pd.read_csv('files.csv', encoding='utf_16_le', names=['f'])['f'].values.tolist()
#make a function to replace the for-loop:
def my_custom_func(file):
dftemp = pd.read_csv(r'{}'.format(os.path.join(path, file)), skiprows=8, names=['x', 'y', 'z', 'dos'], sep=',').drop(['y', 'z'], axis=1)
sim = pd.Series(int(file.split('Nr')[1].split('_')[0]) * np.ones((300,), dtype=int))
det = pd.Series(int(file.split('Nr')[0]) * np.ones((300,), dtype=int))
run = pd.Series(int(file[-8:-4]) * np.ones((300,), dtype=int))
return pd.concat([sim, det, run, dftemp], axis=1)
#use multiprocessing to process multiple files at once
with Pool(8) as p: #8 processes simultaneously. Avoid using more processes than cores in your CPU
dataframes = p.map(my_custom_func, list_of_files)
#Finally, concatenate them all
df = pd.concat(dataframes)
df.rename({0:'sim', 1:'det', 2:'run', 3:'x', 4:'dos'}, axis=1).to_csv(r'df.csv')
查看 multiprocessing.Pool() 了解更多信息。