从 Python Pandas / Dask 中的 Parquet 文件中读取一组行?
Read group of rows from Parquet file in Python Pandas / Dask?
我有一个看起来类似于此的 Pandas 数据框:
datetime data1 data2
2021-01-23 00:00:31.140 a1 a2
2021-01-23 00:00:31.140 b1 b2
2021-01-23 00:00:31.140 c1 c2
2021-01-23 00:01:29.021 d1 d2
2021-01-23 00:02:10.540 e1 e2
2021-01-23 00:02:10.540 f1 f2
真实的数据框非常大,每个唯一的时间戳都有几千行。
我想将这个数据框保存到一个 Parquet 文件中,这样我就可以快速读取所有具有特定日期时间索引的行,而无需加载整个文件或循环遍历它。如何在 Python 中正确保存它以及如何快速只读取一个特定日期时间的行?
阅读后,我想要一个包含特定日期时间的所有行的新数据框。例如,我只想从 Parquet 文件中读取日期时间“2021-01-23 00:00:31.140”的行并接收此数据帧:
datetime data1 data2
2021-01-23 00:00:31.140 a1 a2
2021-01-23 00:00:31.140 b1 b2
2021-01-23 00:00:31.140 c1 c2
我想知道可能首先需要将每个时间戳的数据转换成一列,这样才能通过读取列而不是行来访问它?
2021-01-23 00:00:31.140 2021-01-23 00:01:29.021 2021-01-23 00:02:10.540
['a1', 'a2'] ['d1', 'd2'] ['e1', 'e2']
['b1', 'b2'] NaN ['f1', 'f2']
['c1', 'c2'] NaN NaN
非常感谢您的帮助,非常感谢!
一种解决方案是按时间索引您的数据并使用 dask
,这是一个示例:
import dask
import dask.dataframe as dd
df = dask.datasets.timeseries(
start='2000-01-01',
end='2000-01-2',
freq='1s',
partition_freq='1h')
df
print(len(df))
# 86400 rows across 24 files/partitions
%%time
df.loc['2000-01-01 03:40'].compute()
# result returned in about 8 ms
像您建议的那样使用转置数据框并不是最佳选择,因为您最终会得到每个 file/partition.
唯一的数千列(如果不是更多的话)
因此,根据您的数据,工作流程大致如下所示:
import io
data = io.StringIO("""
datetime|data1|data2
2021-01-23 00:00:31.140|a1|a2
2021-01-23 00:00:31.140|b1|b2
2021-01-23 00:00:31.140|c1|c2
2021-01-23 00:01:29.021|d1|d2
2021-01-23 00:02:10.540|e1|e2
2021-01-23 00:02:10.540|f1|f2""")
import pandas as pd
df = pd.read_csv(data, sep='|', parse_dates=['datetime'])
# make sure the date time column was parsed correctly before
# setting it as an index
df = df.set_index('datetime')
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=3)
ddf.to_parquet('test_parquet')
# note this will create a folder with one file per partition
ddf2 = dd.read_parquet('test_parquet')
ddf2.loc['2021-01-23 00:00:31'].compute()
# if you want to use very precise time, first convert it to datetime format
ts_exact = pd.to_datetime('2021-01-23 00:00:31.140')
ddf2.loc[ts_exact].compute()
我有一个看起来类似于此的 Pandas 数据框:
datetime data1 data2
2021-01-23 00:00:31.140 a1 a2
2021-01-23 00:00:31.140 b1 b2
2021-01-23 00:00:31.140 c1 c2
2021-01-23 00:01:29.021 d1 d2
2021-01-23 00:02:10.540 e1 e2
2021-01-23 00:02:10.540 f1 f2
真实的数据框非常大,每个唯一的时间戳都有几千行。
我想将这个数据框保存到一个 Parquet 文件中,这样我就可以快速读取所有具有特定日期时间索引的行,而无需加载整个文件或循环遍历它。如何在 Python 中正确保存它以及如何快速只读取一个特定日期时间的行?
阅读后,我想要一个包含特定日期时间的所有行的新数据框。例如,我只想从 Parquet 文件中读取日期时间“2021-01-23 00:00:31.140”的行并接收此数据帧:
datetime data1 data2
2021-01-23 00:00:31.140 a1 a2
2021-01-23 00:00:31.140 b1 b2
2021-01-23 00:00:31.140 c1 c2
我想知道可能首先需要将每个时间戳的数据转换成一列,这样才能通过读取列而不是行来访问它?
2021-01-23 00:00:31.140 2021-01-23 00:01:29.021 2021-01-23 00:02:10.540
['a1', 'a2'] ['d1', 'd2'] ['e1', 'e2']
['b1', 'b2'] NaN ['f1', 'f2']
['c1', 'c2'] NaN NaN
非常感谢您的帮助,非常感谢!
一种解决方案是按时间索引您的数据并使用 dask
,这是一个示例:
import dask
import dask.dataframe as dd
df = dask.datasets.timeseries(
start='2000-01-01',
end='2000-01-2',
freq='1s',
partition_freq='1h')
df
print(len(df))
# 86400 rows across 24 files/partitions
%%time
df.loc['2000-01-01 03:40'].compute()
# result returned in about 8 ms
像您建议的那样使用转置数据框并不是最佳选择,因为您最终会得到每个 file/partition.
唯一的数千列(如果不是更多的话)因此,根据您的数据,工作流程大致如下所示:
import io
data = io.StringIO("""
datetime|data1|data2
2021-01-23 00:00:31.140|a1|a2
2021-01-23 00:00:31.140|b1|b2
2021-01-23 00:00:31.140|c1|c2
2021-01-23 00:01:29.021|d1|d2
2021-01-23 00:02:10.540|e1|e2
2021-01-23 00:02:10.540|f1|f2""")
import pandas as pd
df = pd.read_csv(data, sep='|', parse_dates=['datetime'])
# make sure the date time column was parsed correctly before
# setting it as an index
df = df.set_index('datetime')
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=3)
ddf.to_parquet('test_parquet')
# note this will create a folder with one file per partition
ddf2 = dd.read_parquet('test_parquet')
ddf2.loc['2021-01-23 00:00:31'].compute()
# if you want to use very precise time, first convert it to datetime format
ts_exact = pd.to_datetime('2021-01-23 00:00:31.140')
ddf2.loc[ts_exact].compute()