如何将多个 pandas 数据帧连接成一个大于内存的 dask 数据帧?
How to concat multiple pandas dataframes into one dask dataframe larger than memory?
我正在解析以制表符分隔的数据以创建表格数据,我想将其存储在 HDF5 中。
我的问题是我必须将数据聚合成一种格式,然后转储到 HDF5 中。这是 ~1 TB 大小的数据,所以我自然无法将其放入 RAM。 Dask 可能是完成此任务的最佳方式。
如果我使用解析我的数据来适应一个 pandas 数据框,我会这样做:
import pandas as pd
import csv
csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"]
readcsvfile = csv.reader(csvfile)
total_df = pd.DataFrame() # create empty pandas DataFrame
for i, line in readcsvfile:
# parse create dictionary of key:value pairs by table field:value, "dictionary_line"
# save dictionary as pandas dataframe
df = pd.DataFrame(dictionary_line, index=[i]) # one line tabular data
total_df = pd.concat([total_df, df]) # creates one big dataframe
使用 dask 完成相同的任务,看来用户应该尝试这样的事情:
import pandas as pd
import csv
import dask.dataframe as dd
import dask.array as da
csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"] # define columns
readcsvfile = csv.reader(csvfile) # read in file, if csv
# somehow define empty dask dataframe total_df = dd.Dataframe()?
for i, line in readcsvfile:
# parse create dictionary of key:value pairs by table field:value, "dictionary_line"
# save dictionary as pandas dataframe
df = pd.DataFrame(dictionary_line, index=[i]) # one line tabular data
total_df = da.concatenate([total_df, df]) # creates one big dataframe
创建 ~TB 数据帧后,我将保存到 hdf5。
我的问题是 total_df
不适合 RAM,必须保存到磁盘。 dask
dataframe 能完成这个任务吗?
我应该试试别的吗?从多个 dask 阵列创建 HDF5 会更容易吗,即每个 column/field 一个 dask 阵列?也许在几个节点之间划分数据帧并在最后减少?
编辑:为了清楚起见,我实际上并不是直接从 csv 文件中读取数据。我正在聚合、解析和格式化表格数据。因此,readcsvfile = csv.reader(csvfile)
在上面用于 clarity/brevity,但它比读取 csv 文件要复杂得多。
Dask.dataframe 通过惰性处理大于内存的数据集。将具体数据附加到 dask.dataframe 不会有成效。
如果您的数据可以被pd.read_csv
处理
pandas.read_csv功能非常灵活。你在上面说你的解析过程非常复杂,但可能仍然值得研究 pd.read_csv
的选项,看看它是否仍然有效。 dask.dataframe.read_csv
函数支持这些相同的参数。
特别是如果担心您的数据是由制表符而不是逗号分隔的,这根本不是问题。 Pandas 支持 sep='\t'
关键字以及几十个其他选项。
考虑dask.bag
如果您想逐行操作文本文件,请考虑使用 dask.bag 来解析您的数据,从一堆文本开始。
import dask.bag as db
b = db.read_text('myfile.tsv', blocksize=10000000) # break into 10MB chunks
records = b.str.split('\t').map(parse)
df = records.to_dataframe(columns=...)
写入 HDF5 文件
一旦你 dask.dataframe 尝试 .to_hdf
方法:
df.to_hdf('myfile.hdf5', '/df')
我正在解析以制表符分隔的数据以创建表格数据,我想将其存储在 HDF5 中。
我的问题是我必须将数据聚合成一种格式,然后转储到 HDF5 中。这是 ~1 TB 大小的数据,所以我自然无法将其放入 RAM。 Dask 可能是完成此任务的最佳方式。
如果我使用解析我的数据来适应一个 pandas 数据框,我会这样做:
import pandas as pd
import csv
csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"]
readcsvfile = csv.reader(csvfile)
total_df = pd.DataFrame() # create empty pandas DataFrame
for i, line in readcsvfile:
# parse create dictionary of key:value pairs by table field:value, "dictionary_line"
# save dictionary as pandas dataframe
df = pd.DataFrame(dictionary_line, index=[i]) # one line tabular data
total_df = pd.concat([total_df, df]) # creates one big dataframe
使用 dask 完成相同的任务,看来用户应该尝试这样的事情:
import pandas as pd
import csv
import dask.dataframe as dd
import dask.array as da
csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"] # define columns
readcsvfile = csv.reader(csvfile) # read in file, if csv
# somehow define empty dask dataframe total_df = dd.Dataframe()?
for i, line in readcsvfile:
# parse create dictionary of key:value pairs by table field:value, "dictionary_line"
# save dictionary as pandas dataframe
df = pd.DataFrame(dictionary_line, index=[i]) # one line tabular data
total_df = da.concatenate([total_df, df]) # creates one big dataframe
创建 ~TB 数据帧后,我将保存到 hdf5。
我的问题是 total_df
不适合 RAM,必须保存到磁盘。 dask
dataframe 能完成这个任务吗?
我应该试试别的吗?从多个 dask 阵列创建 HDF5 会更容易吗,即每个 column/field 一个 dask 阵列?也许在几个节点之间划分数据帧并在最后减少?
编辑:为了清楚起见,我实际上并不是直接从 csv 文件中读取数据。我正在聚合、解析和格式化表格数据。因此,readcsvfile = csv.reader(csvfile)
在上面用于 clarity/brevity,但它比读取 csv 文件要复杂得多。
Dask.dataframe 通过惰性处理大于内存的数据集。将具体数据附加到 dask.dataframe 不会有成效。
如果您的数据可以被pd.read_csv
处理pandas.read_csv功能非常灵活。你在上面说你的解析过程非常复杂,但可能仍然值得研究 pd.read_csv
的选项,看看它是否仍然有效。 dask.dataframe.read_csv
函数支持这些相同的参数。
特别是如果担心您的数据是由制表符而不是逗号分隔的,这根本不是问题。 Pandas 支持 sep='\t'
关键字以及几十个其他选项。
考虑dask.bag
如果您想逐行操作文本文件,请考虑使用 dask.bag 来解析您的数据,从一堆文本开始。
import dask.bag as db
b = db.read_text('myfile.tsv', blocksize=10000000) # break into 10MB chunks
records = b.str.split('\t').map(parse)
df = records.to_dataframe(columns=...)
写入 HDF5 文件
一旦你 dask.dataframe 尝试 .to_hdf
方法:
df.to_hdf('myfile.hdf5', '/df')