如何减少将 dask 数据帧转换为 pandas 数据帧所花费的时间
How to reduce time taken by to convert dask dataframe to pandas dataframe
我有一个函数可以使用 dask 数据帧读取大型 csv 文件,然后转换为 pandas 数据帧,这需要花费很多时间。代码是:
def t_createdd(Path):
dataframe = dd.read_csv(Path, sep = chr(1), encoding = "utf-16")
return dataframe
#Get the latest file
Array_EXT = "Export_GTT_Tea2Array_*.csv"
array_csv_files = sorted([file
for path, subdir, files in os.walk(PATH)
for file in glob(os.path.join(path, Array_EXT))])
latest_Tea2Array=array_csv_files[(len(array_csv_files)-(58+25)):
(len(array_csv_files)-58)]
Tea2Array_latest = t_createdd(latest_Tea2Array)
#keep only the required columns
Tea2Array = Tea2Array_latest[['Parameter_Id','Reading_Id','X','Value']]
P1MI3 = Tea2Array.loc[Tea2Array['parameter_id']==168566]
P1MI3=P1MI3.compute()
P1MJC_main = Tea2Array.loc[Tea2Array['parameter_id']==168577]
P1MJC_old=P1MJC_main.compute()
P1MI3=P1MI3.compute()
和 P1MJC_old=P1MJC_main.compute()
分别需要 10
和 11
分钟来执行。有什么办法可以缩短时间吗
我鼓励您参考 Dask 文档,考虑为什么您期望该过程比单独使用 Pandas 更快。
考虑:
- 文件访问可能来自多个线程,但您只有一个磁盘接口瓶颈,并且顺序读取的性能可能比尝试并行读取多个文件要好得多
- 读取 CSV 是 CPU-繁重的,需要 python GIL。多线程实际上不会运行并行
- 当你计算时,你实现了整个数据框。的确,您似乎在每种情况下都选择了一行,但 Dask 无法知道它在哪 file/part。
- 你调用了两次计算,但可以将它们结合起来:Dask 努力从内存中逐出当前任何计算都不需要的数据,所以你做了双倍的工作。通过在两个输出上调用计算,您可以将时间减半。
进一步说明:
- 显然,如果您知道哪个分区包含什么,您会做得更好
- 您可以使用进程绕过 GIL,例如 Dask 的分布式调度程序
- 如果你只需要某些列,不要费心加载所有内容然后再选择,将这些列直接包含在 read_csv 函数中,节省大量时间和内存(对于 pandas或达斯克)。
同时计算两个惰性事物:
dask.compute(P1MI3, P1MJC_main)
我有一个函数可以使用 dask 数据帧读取大型 csv 文件,然后转换为 pandas 数据帧,这需要花费很多时间。代码是:
def t_createdd(Path):
dataframe = dd.read_csv(Path, sep = chr(1), encoding = "utf-16")
return dataframe
#Get the latest file
Array_EXT = "Export_GTT_Tea2Array_*.csv"
array_csv_files = sorted([file
for path, subdir, files in os.walk(PATH)
for file in glob(os.path.join(path, Array_EXT))])
latest_Tea2Array=array_csv_files[(len(array_csv_files)-(58+25)):
(len(array_csv_files)-58)]
Tea2Array_latest = t_createdd(latest_Tea2Array)
#keep only the required columns
Tea2Array = Tea2Array_latest[['Parameter_Id','Reading_Id','X','Value']]
P1MI3 = Tea2Array.loc[Tea2Array['parameter_id']==168566]
P1MI3=P1MI3.compute()
P1MJC_main = Tea2Array.loc[Tea2Array['parameter_id']==168577]
P1MJC_old=P1MJC_main.compute()
P1MI3=P1MI3.compute()
和 P1MJC_old=P1MJC_main.compute()
分别需要 10
和 11
分钟来执行。有什么办法可以缩短时间吗
我鼓励您参考 Dask 文档,考虑为什么您期望该过程比单独使用 Pandas 更快。 考虑:
- 文件访问可能来自多个线程,但您只有一个磁盘接口瓶颈,并且顺序读取的性能可能比尝试并行读取多个文件要好得多
- 读取 CSV 是 CPU-繁重的,需要 python GIL。多线程实际上不会运行并行
- 当你计算时,你实现了整个数据框。的确,您似乎在每种情况下都选择了一行,但 Dask 无法知道它在哪 file/part。
- 你调用了两次计算,但可以将它们结合起来:Dask 努力从内存中逐出当前任何计算都不需要的数据,所以你做了双倍的工作。通过在两个输出上调用计算,您可以将时间减半。
进一步说明:
- 显然,如果您知道哪个分区包含什么,您会做得更好
- 您可以使用进程绕过 GIL,例如 Dask 的分布式调度程序
- 如果你只需要某些列,不要费心加载所有内容然后再选择,将这些列直接包含在 read_csv 函数中,节省大量时间和内存(对于 pandas或达斯克)。
同时计算两个惰性事物:
dask.compute(P1MI3, P1MJC_main)