如何将多个 pandas 数据帧连接成一个大于内存的 dask 数据帧?

How to concat multiple pandas dataframes into one dask dataframe larger than memory?

我正在解析以制表符分隔的数据以创建表格数据,我想将其存储在 HDF5 中。

我的问题是我必须将数据聚合成一种格式,然后转储到 HDF5 中。这是 ~1 TB 大小的数据,所以我自然无法将其放入 RAM。 Dask 可能是完成此任务的最佳方式。

如果我使用解析我的数据来适应一个 pandas 数据框,我会这样做:

import pandas as pd
import csv   

csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"]
readcsvfile = csv.reader(csvfile)

total_df = pd.DataFrame()    # create empty pandas DataFrame
for i, line in readcsvfile:
    # parse create dictionary of key:value pairs by table field:value, "dictionary_line"
    # save dictionary as pandas dataframe
    df = pd.DataFrame(dictionary_line, index=[i])  # one line tabular data 
    total_df = pd.concat([total_df, df])   # creates one big dataframe

使用 dask 完成相同的任务,看来用户应该尝试这样的事情:

import pandas as pd
import csv 
import dask.dataframe as dd
import dask.array as da

csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"]   # define columns
readcsvfile = csv.reader(csvfile)       # read in file, if csv

# somehow define empty dask dataframe   total_df = dd.Dataframe()? 
for i, line in readcsvfile:
    # parse create dictionary of key:value pairs by table field:value, "dictionary_line"
    # save dictionary as pandas dataframe
    df = pd.DataFrame(dictionary_line, index=[i])  # one line tabular data 
    total_df = da.concatenate([total_df, df])   # creates one big dataframe

创建 ~TB 数据帧后,我将保存到 hdf5。

我的问题是 total_df 不适合 RAM,必须保存到磁盘。 dask dataframe 能完成这个任务吗?

我应该试试别的吗?从多个 dask 阵列创建 HDF5 会更容易吗,即每个 column/field 一个 dask 阵列?也许在几个节点之间划分数据帧并在最后减少?

编辑:为了清楚起见,我实际上并不是直接从 csv 文件中读取数据。我正在聚合、解析和格式化表格数据。因此,readcsvfile = csv.reader(csvfile) 在上面用于 clarity/brevity,但它比读取 csv 文件要复杂得多。

Dask.dataframe 通过惰性处理大于内存的数据集。将具体数据附加到 dask.dataframe 不会有成效。

如果您的数据可以被pd.read_csv

处理

pandas.read_csv功能非常灵活。你在上面说你的解析过程非常复杂,但可能仍然值得研究 pd.read_csv 的选项,看看它是否仍然有效。 dask.dataframe.read_csv 函数支持这些相同的参数。

特别是如果担心您的数据是由制表符而不是逗号分隔的,这根本不是问题。 Pandas 支持 sep='\t' 关键字以及几十个其他选项。

考虑dask.bag

如果您想逐行操作文本文件,请考虑使用 dask.bag 来解析您的数据,从一堆文本开始。

import dask.bag as db
b = db.read_text('myfile.tsv', blocksize=10000000)  # break into 10MB chunks
records = b.str.split('\t').map(parse)
df = records.to_dataframe(columns=...)

写入 HDF5 文件

一旦你 dask.dataframe 尝试 .to_hdf 方法:

df.to_hdf('myfile.hdf5', '/df')