使用非唯一索引列日期在 Dask 数据框中提取最新值
Extracting latest values in a Dask dataframe with non-unique index column dates
我对 pandas 数据帧非常熟悉,但我对 Dask 还很陌生,所以我仍在努力使我的代码并行化。
我已经使用 pandas 和 pandarallel 获得了我想要的结果,所以我想弄清楚的是我是否可以使用 Dask 扩大任务或以某种方式加速它。
假设我的数据框具有作为非唯一索引的日期时间、一个值列和一个 ID 列。
time value id
2021-01-01 00:00:00.210281 28.08 293707
2021-01-01 00:00:00.279228 28.07 293708
2021-01-01 00:00:00.697341 28.08 293709
2021-01-01 00:00:00.941704 28.08 293710
2021-01-01 00:00:00.945422 28.07 293711
... ... ...
2021-01-01 23:59:59.288914 29.84 512665
2021-01-01 23:59:59.288914 29.83 512666
2021-01-01 23:59:59.288914 29.82 512667
2021-01-01 23:59:59.525227 29.84 512668
2021-01-01 23:59:59.784754 29.84 512669
我要提取的是每一秒的最新值。例如如果 2021-01-01 00:00:01
之前的价格是索引为 2021-01-01 00:00:00.945422
的行,则最新值为 28.07
.
就我而言,有时索引值不是唯一的,因此作为决胜局,我想使用 id
列。具有最大 id
数字的值将被视为最新值。对于在时间 2021-01-01 23:59:59.288914
出现的三个值并列的情况,将选择值 29.82
,因为该日期的最大 id
将是 512667
。另请注意,id
在整个数据集中并不一致,我不能只依赖它来排序我的数据。
在pandas中,我只是通过获取最后一个索引来做到这一点
last_index = df.loc[date_minus60: date_curr].index[-1]
last_values = df.loc[last_index]
然后如果last_values.index.is_unique
的值为假,我最后执行last_values.sort_values('id').iloc[-1]
。
我一直很难将此代码转换为 Dask,遇到有关我的延迟函数的问题,导致他们需要计算才能再次重新索引我的数据帧。
我想知道是否有处理这类问题的最佳实践。
下面的代码片段表明它是一个非常相似的语法:
import dask
# generate dask dataframe
ddf = dask.datasets.timeseries(freq="500ms", partition_freq="1h")
# generate a pandas dataframe
df = ddf.partitions[0].compute() # pandas df for example
# sample dates
date_minus60 = "2000-01-01 00:00:00.000"
date_curr = "2000-01-01 00:00:02.000"
# pandas code
last_index_pandas = df.loc[date_minus60:date_curr].index[-1]
last_values_pandas = df.loc[last_index_pandas]
# dask code
last_index_dask = ddf.loc[date_minus60:date_curr].compute().index[-1]
last_values_dask = ddf.loc[last_index_dask].compute()
# check equality of the results
print(last_values_pandas == last_values_dask)
请注意,在 dask
版本中,区别在于两个 .compute
步骤,因为需要计算两个惰性值:首先是找出正确的索引位置,其次是获取实际价值。此外,这还假设数据已经被时间戳索引,如果不是,最好在加载到 dask
之前索引数据,因为 .set_index
通常是一个缓慢的操作。
不过,这可能不是很好用dask
,这取决于你的真实目的。如果潜在的想法是进行快速查找,那么更好的解决方案是使用索引数据库(包括专门的时间序列数据库)。
最后,上面的代码片段使用了唯一索引。如果实际数据具有非唯一索引,那么一旦 last_values_dask
被计算出来,就应该通过使用类似这样的东西(伪代码,预计不会马上工作):
def get_largest_id(last_values):
return last_values.sort_values('id').tail(1)
last_values_dask = get_largest_id(last_values_dask)
如果查找是针对批次(而不是特定的样本日期),则可以设计更好的管道。
@Kafkaesque
这是另一种考虑使用 map_partitions
, which maps a custom function across each partition, treating each as a Pandas DataFrame. Generally, it's advisable to use dask.dataframe
methods directly. In this case, however, dask.DataFrame.sort_values
only supports sorting by a single column, so map_partitions
is a good alternative. You may also find these Dask Groupby examples 有用的方法。
值得注意的是,使用 map_partitions
+ groupby
仅当您的数据集已经排序时才有效,这样相同的秒数位于同一分区中。以下示例针对数据未排序的情况:
import dask
import dask.dataframe as dd
import pandas as pd
# example dataset, use sample() to "unsort"
ddf = dask.datasets.timeseries(
freq="250ms", partition_freq="5d", seed=42
).sample(frac=0.9, replace=True, random_state=42)
# first set the rounded timestamp as the index before calling map_partitions
# (don't need to reset the index if your dataset is already sorted)
ddf = ddf.reset_index()
ddf = ddf.assign(round_timestamp=ddf['timestamp'].dt.floor('S')).set_index('round_timestamp')
def custom_func(df):
return (
df
.sort_values(by=['timestamp', 'id'])
.groupby('round_timestamp')
.last()
)
new_ddf = ddf.map_partitions(custom_func)
# shows embarrassingly parallel execution of 'custom_func' across each partition
new_ddf.visualize(optimize_graph=True)
# check the result of the first partition
new_ddf.partitions[0].compute()
我对 pandas 数据帧非常熟悉,但我对 Dask 还很陌生,所以我仍在努力使我的代码并行化。 我已经使用 pandas 和 pandarallel 获得了我想要的结果,所以我想弄清楚的是我是否可以使用 Dask 扩大任务或以某种方式加速它。
假设我的数据框具有作为非唯一索引的日期时间、一个值列和一个 ID 列。
time value id
2021-01-01 00:00:00.210281 28.08 293707
2021-01-01 00:00:00.279228 28.07 293708
2021-01-01 00:00:00.697341 28.08 293709
2021-01-01 00:00:00.941704 28.08 293710
2021-01-01 00:00:00.945422 28.07 293711
... ... ...
2021-01-01 23:59:59.288914 29.84 512665
2021-01-01 23:59:59.288914 29.83 512666
2021-01-01 23:59:59.288914 29.82 512667
2021-01-01 23:59:59.525227 29.84 512668
2021-01-01 23:59:59.784754 29.84 512669
我要提取的是每一秒的最新值。例如如果 2021-01-01 00:00:01
之前的价格是索引为 2021-01-01 00:00:00.945422
的行,则最新值为 28.07
.
就我而言,有时索引值不是唯一的,因此作为决胜局,我想使用 id
列。具有最大 id
数字的值将被视为最新值。对于在时间 2021-01-01 23:59:59.288914
出现的三个值并列的情况,将选择值 29.82
,因为该日期的最大 id
将是 512667
。另请注意,id
在整个数据集中并不一致,我不能只依赖它来排序我的数据。
在pandas中,我只是通过获取最后一个索引来做到这一点
last_index = df.loc[date_minus60: date_curr].index[-1]
last_values = df.loc[last_index]
然后如果last_values.index.is_unique
的值为假,我最后执行last_values.sort_values('id').iloc[-1]
。
我一直很难将此代码转换为 Dask,遇到有关我的延迟函数的问题,导致他们需要计算才能再次重新索引我的数据帧。
我想知道是否有处理这类问题的最佳实践。
下面的代码片段表明它是一个非常相似的语法:
import dask
# generate dask dataframe
ddf = dask.datasets.timeseries(freq="500ms", partition_freq="1h")
# generate a pandas dataframe
df = ddf.partitions[0].compute() # pandas df for example
# sample dates
date_minus60 = "2000-01-01 00:00:00.000"
date_curr = "2000-01-01 00:00:02.000"
# pandas code
last_index_pandas = df.loc[date_minus60:date_curr].index[-1]
last_values_pandas = df.loc[last_index_pandas]
# dask code
last_index_dask = ddf.loc[date_minus60:date_curr].compute().index[-1]
last_values_dask = ddf.loc[last_index_dask].compute()
# check equality of the results
print(last_values_pandas == last_values_dask)
请注意,在 dask
版本中,区别在于两个 .compute
步骤,因为需要计算两个惰性值:首先是找出正确的索引位置,其次是获取实际价值。此外,这还假设数据已经被时间戳索引,如果不是,最好在加载到 dask
之前索引数据,因为 .set_index
通常是一个缓慢的操作。
不过,这可能不是很好用dask
,这取决于你的真实目的。如果潜在的想法是进行快速查找,那么更好的解决方案是使用索引数据库(包括专门的时间序列数据库)。
最后,上面的代码片段使用了唯一索引。如果实际数据具有非唯一索引,那么一旦 last_values_dask
被计算出来,就应该通过使用类似这样的东西(伪代码,预计不会马上工作):
def get_largest_id(last_values):
return last_values.sort_values('id').tail(1)
last_values_dask = get_largest_id(last_values_dask)
如果查找是针对批次(而不是特定的样本日期),则可以设计更好的管道。
@Kafkaesque
这是另一种考虑使用 map_partitions
, which maps a custom function across each partition, treating each as a Pandas DataFrame. Generally, it's advisable to use dask.dataframe
methods directly. In this case, however, dask.DataFrame.sort_values
only supports sorting by a single column, so map_partitions
is a good alternative. You may also find these Dask Groupby examples 有用的方法。
值得注意的是,使用 map_partitions
+ groupby
仅当您的数据集已经排序时才有效,这样相同的秒数位于同一分区中。以下示例针对数据未排序的情况:
import dask
import dask.dataframe as dd
import pandas as pd
# example dataset, use sample() to "unsort"
ddf = dask.datasets.timeseries(
freq="250ms", partition_freq="5d", seed=42
).sample(frac=0.9, replace=True, random_state=42)
# first set the rounded timestamp as the index before calling map_partitions
# (don't need to reset the index if your dataset is already sorted)
ddf = ddf.reset_index()
ddf = ddf.assign(round_timestamp=ddf['timestamp'].dt.floor('S')).set_index('round_timestamp')
def custom_func(df):
return (
df
.sort_values(by=['timestamp', 'id'])
.groupby('round_timestamp')
.last()
)
new_ddf = ddf.map_partitions(custom_func)
# shows embarrassingly parallel execution of 'custom_func' across each partition
new_ddf.visualize(optimize_graph=True)
# check the result of the first partition
new_ddf.partitions[0].compute()