Dask 数据帧:读取多个文件并将文件名存储在列中
Dask dataframes: reading multiple files & storing filename in column
我经常使用dask.dataframe
读取多个文件,因此:
import dask.dataframe as dd
df = dd.read_csv('*.csv')
但是,每一行的来源,即从哪个文件读取数据,似乎永远丢失了。
有没有办法将其添加为一列,例如df.loc[:100, 'partition'] = 'file1.csv'
如果 file1.csv
是第一个文件并且包含 100 行。当 compute
作为工作流的一部分被触发时,这将应用于读入数据帧的每个 "partition" / 文件。
想法是可以根据来源应用不同的逻辑。
假设您有或可以制作一个 file_list
列表,其中包含每个 csv 文件的文件路径,并且每个单独的文件都适合 RAM(您提到的 100 行),那么这应该可行:
import pandas as pd
import dask.dataframe as dd
from dask import delayed
def read_and_label_csv(filename):
# reads each csv file to a pandas.DataFrame
df_csv = pd.read_csv(filename)
df_csv['partition'] = filename.split('\')[-1]
return df_csv
# create a list of functions ready to return a pandas.DataFrame
dfs = [delayed(read_and_label_csv)(fname) for fname in file_list]
# using delayed, assemble the pandas.DataFrames into a dask.DataFrame
ddf = dd.from_delayed(dfs)
当然要进行一些定制。如果您的 csv 文件大于 RAM,那么 dask.DataFrame
s 的连接可能是可行的方法。
Dask 函数 read_csv, read_table, and read_fwf 现在包含一个参数 include_path_column
:
include_path_column:bool or str, optional
Whether or not to include the path to each particular file.
If True a new column is added to the dataframe called path.
If str, sets new column name. Default is False.
我经常使用dask.dataframe
读取多个文件,因此:
import dask.dataframe as dd
df = dd.read_csv('*.csv')
但是,每一行的来源,即从哪个文件读取数据,似乎永远丢失了。
有没有办法将其添加为一列,例如df.loc[:100, 'partition'] = 'file1.csv'
如果 file1.csv
是第一个文件并且包含 100 行。当 compute
作为工作流的一部分被触发时,这将应用于读入数据帧的每个 "partition" / 文件。
想法是可以根据来源应用不同的逻辑。
假设您有或可以制作一个 file_list
列表,其中包含每个 csv 文件的文件路径,并且每个单独的文件都适合 RAM(您提到的 100 行),那么这应该可行:
import pandas as pd
import dask.dataframe as dd
from dask import delayed
def read_and_label_csv(filename):
# reads each csv file to a pandas.DataFrame
df_csv = pd.read_csv(filename)
df_csv['partition'] = filename.split('\')[-1]
return df_csv
# create a list of functions ready to return a pandas.DataFrame
dfs = [delayed(read_and_label_csv)(fname) for fname in file_list]
# using delayed, assemble the pandas.DataFrames into a dask.DataFrame
ddf = dd.from_delayed(dfs)
当然要进行一些定制。如果您的 csv 文件大于 RAM,那么 dask.DataFrame
s 的连接可能是可行的方法。
Dask 函数 read_csv, read_table, and read_fwf 现在包含一个参数 include_path_column
:
include_path_column:bool or str, optional
Whether or not to include the path to each particular file.
If True a new column is added to the dataframe called path.
If str, sets new column name. Default is False.