加载多个镶木地板文件时保留 dask 数据帧划分
Preserving dask dataframe divisions when loading multiple parquet files
我在以时间为索引的数据框中有一些时间序列数据。索引已排序,数据存储在多个 parquet 文件中,每个文件中有一天的数据。我使用 dask 2.9.1
当我从一个 parquet 文件加载数据时,分区设置正确。
当我从多个文件加载数据时,我没有在生成的 dask 数据帧中得到差异。
下面的例子说明了这个问题:
import pandas as pd
import pandas.util.testing as tm
import dask.dataframe as dd
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq", engine="fastparquet" )
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq", engine="fastparquet" )
ddf = dd.read_parquet( "df*d.parq", infer_divisions=True, sorted_index=True, engine="fastparquet" )
print(ddf.npartitions, ddf.divisions)
这里我得到 2 个分区和 (None, None, None)
作为分区
我可以 dd.read_parquet 将分区设置为实际值吗?
更新
在我的实际数据中,我每天都有一个镶木地板文件。
这些文件是通过保存数据帧中的数据创建的,其中时间戳用作索引。索引已排序。每个文件的大小为 100-150MB,当加载到内存中时,它使用 2.5GB 的 RAM,激活索引很重要,因为重新创建索引非常繁重。
我没能在 read_parquet 上找到参数或引擎的组合,使其在负载上产生分裂。
数据文件被命名为 "yyyy-mm-dd.parquet",所以我根据该信息创建分区:
from pathlib import Path
files = list (Path("e:/data").glob("2019-06-*.parquet") )
divisions = [ pd.Timestamp( f.stem) for f in files ] + [ pd.Timestamp( files[-1].stem) + pd.Timedelta(1, unit='D' ) ]
ddf = dd.read_parquet( files )
ddf.divisions = divisions
这没有启用索引,在某些情况下它失败了 "TypeError: can only concatenate tuple (not "list") to tuple"
然后我尝试将分区设置为元组
ddf.divisions = tuple(divisions)
然后成功了。当索引设置正确时,dask 非常快
更新 2
更好的方法是单独读取 dask 数据帧,然后将它们连接起来:
from pathlib import Path
import dask.dataframe as dd
files = list (Path("e:/data").glob("2019-06-*.parquet") )
ddfs = [ dd.read_parquet( f ) for f in files ]
ddf = dd.concat(ddfs, axis=0)
以这种方式设置了分区,它还解决了另一个处理随时间增加列的问题。
下面我将原题重写为使用concat,解决了我的问题
import pandas as pd
import pandas.util.testing as tm
import dask.dataframe as dd
# create two example parquet files
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq" )
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq" )
# read the files and concatenate
ddf = dd.concat([dd.read_parquet( d ) for d in ["df1d.parq", "df2d.parq"] ], axis=0)
print(ddf.npartitions, ddf.divisions)
我仍然得到预期的 2 个分区,但现在分区是 (Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-02 23:00:00'))
我在以时间为索引的数据框中有一些时间序列数据。索引已排序,数据存储在多个 parquet 文件中,每个文件中有一天的数据。我使用 dask 2.9.1
当我从一个 parquet 文件加载数据时,分区设置正确。
当我从多个文件加载数据时,我没有在生成的 dask 数据帧中得到差异。
下面的例子说明了这个问题:
import pandas as pd
import pandas.util.testing as tm
import dask.dataframe as dd
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq", engine="fastparquet" )
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq", engine="fastparquet" )
ddf = dd.read_parquet( "df*d.parq", infer_divisions=True, sorted_index=True, engine="fastparquet" )
print(ddf.npartitions, ddf.divisions)
这里我得到 2 个分区和 (None, None, None)
作为分区
我可以 dd.read_parquet 将分区设置为实际值吗?
更新
在我的实际数据中,我每天都有一个镶木地板文件。
这些文件是通过保存数据帧中的数据创建的,其中时间戳用作索引。索引已排序。每个文件的大小为 100-150MB,当加载到内存中时,它使用 2.5GB 的 RAM,激活索引很重要,因为重新创建索引非常繁重。
我没能在 read_parquet 上找到参数或引擎的组合,使其在负载上产生分裂。
数据文件被命名为 "yyyy-mm-dd.parquet",所以我根据该信息创建分区:
from pathlib import Path
files = list (Path("e:/data").glob("2019-06-*.parquet") )
divisions = [ pd.Timestamp( f.stem) for f in files ] + [ pd.Timestamp( files[-1].stem) + pd.Timedelta(1, unit='D' ) ]
ddf = dd.read_parquet( files )
ddf.divisions = divisions
这没有启用索引,在某些情况下它失败了 "TypeError: can only concatenate tuple (not "list") to tuple"
然后我尝试将分区设置为元组
ddf.divisions = tuple(divisions)
然后成功了。当索引设置正确时,dask 非常快
更新 2
更好的方法是单独读取 dask 数据帧,然后将它们连接起来:
from pathlib import Path
import dask.dataframe as dd
files = list (Path("e:/data").glob("2019-06-*.parquet") )
ddfs = [ dd.read_parquet( f ) for f in files ]
ddf = dd.concat(ddfs, axis=0)
以这种方式设置了分区,它还解决了另一个处理随时间增加列的问题。
下面我将原题重写为使用concat,解决了我的问题
import pandas as pd
import pandas.util.testing as tm
import dask.dataframe as dd
# create two example parquet files
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq" )
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq" )
# read the files and concatenate
ddf = dd.concat([dd.read_parquet( d ) for d in ["df1d.parq", "df2d.parq"] ], axis=0)
print(ddf.npartitions, ddf.divisions)
我仍然得到预期的 2 个分区,但现在分区是 (Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-02 23:00:00'))