dask.read_parquet 导致 OOM 错误
dask.read_parquet causes OOM Error
我一直在使用 dask 对多个 csv 文件执行数据清理。此代码工作正常:
import pandas as pd
import glob
import os
from timeit import default_timer
from dask.distributed import Client
import dask.dataframe as dd
cols_to_keep = ["barcode", "salesdate", "storecode", "quantity", "salesvalue", "promotion", "key_row"]
col_types = {'barcode': object,
'salesdate': object,
'storecode': object,
'quantity': float,
'salesvalue': float,
'promotion': object,
'key_row': object}
trans = dd.read_csv(os.path.join(TRANS_PATH, "*.TXT"),
sep=";", usecols=cols_to_keep, dtype=col_types, parse_dates=['salesdate'])
trans = trans[trans['barcode'].isin(barcodes)]
trans_df = trans.compute()
我决定试用 parquet 存储系统,因为它据说更快并且受 dask 支持。使用 pandas' to_parquet()
方法将 csv 文件转换为 .parquet 后,我尝试了以下操作:
cols_to_keep = ["barcode", "salesdate", "storecode", "quantity", "salesvalue", "promotion", "key_row"]
trans = dd.read_parquet(os.path.join(PARQUET_PATH, '*.parquet'), columns=cols_to_keep)
trans = trans[trans['barcode'].isin(barcodes)]
trans_df = trans.compute()
图表开始执行后不久,工作人员 运行 内存不足,我收到多个警告:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 13620 was killed by signal 15
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 13396 was killed by signal 15
最后整个程序崩溃了。我的 .parquet 文件不是问题,我可以使用 pandas' read_parquet()
方法加载这些文件。从 dask 实用程序中,我注意到由于某种原因,该图会在使用 .isin
调用执行任何过滤之前尝试读取所有内容:
使用dd.read_csv()
时就不是这样了。在这里,一切 运行s 'in parallel' 所以过滤可以防止 OOM:
有谁知道发生了什么事吗?我错过了什么?
您的问题是使用 pandas.to_parquet()
写入数据。这从数据中创建了一个巨大的行组,当 Dask 读取它时它成为一个分区——Dask 遵循数据中的任何分区。相反,Dask 会自动对 CSV 输入进行分区,而不是假设数据具有固有的分区。
既然您已经在使用 Dask,您也应该使用它来编写 parquet 数据,使用 dask.DataFrame.to_parquet,类似于 Pandas 方法。它会在一个目录中产生多个文件,这些文件将被独立并行读取。
我一直在使用 dask 对多个 csv 文件执行数据清理。此代码工作正常:
import pandas as pd
import glob
import os
from timeit import default_timer
from dask.distributed import Client
import dask.dataframe as dd
cols_to_keep = ["barcode", "salesdate", "storecode", "quantity", "salesvalue", "promotion", "key_row"]
col_types = {'barcode': object,
'salesdate': object,
'storecode': object,
'quantity': float,
'salesvalue': float,
'promotion': object,
'key_row': object}
trans = dd.read_csv(os.path.join(TRANS_PATH, "*.TXT"),
sep=";", usecols=cols_to_keep, dtype=col_types, parse_dates=['salesdate'])
trans = trans[trans['barcode'].isin(barcodes)]
trans_df = trans.compute()
我决定试用 parquet 存储系统,因为它据说更快并且受 dask 支持。使用 pandas' to_parquet()
方法将 csv 文件转换为 .parquet 后,我尝试了以下操作:
cols_to_keep = ["barcode", "salesdate", "storecode", "quantity", "salesvalue", "promotion", "key_row"]
trans = dd.read_parquet(os.path.join(PARQUET_PATH, '*.parquet'), columns=cols_to_keep)
trans = trans[trans['barcode'].isin(barcodes)]
trans_df = trans.compute()
图表开始执行后不久,工作人员 运行 内存不足,我收到多个警告:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 13620 was killed by signal 15
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 13396 was killed by signal 15
最后整个程序崩溃了。我的 .parquet 文件不是问题,我可以使用 pandas' read_parquet()
方法加载这些文件。从 dask 实用程序中,我注意到由于某种原因,该图会在使用 .isin
调用执行任何过滤之前尝试读取所有内容:
使用dd.read_csv()
时就不是这样了。在这里,一切 运行s 'in parallel' 所以过滤可以防止 OOM:
有谁知道发生了什么事吗?我错过了什么?
您的问题是使用 pandas.to_parquet()
写入数据。这从数据中创建了一个巨大的行组,当 Dask 读取它时它成为一个分区——Dask 遵循数据中的任何分区。相反,Dask 会自动对 CSV 输入进行分区,而不是假设数据具有固有的分区。
既然您已经在使用 Dask,您也应该使用它来编写 parquet 数据,使用 dask.DataFrame.to_parquet,类似于 Pandas 方法。它会在一个目录中产生多个文件,这些文件将被独立并行读取。