按日期对 Pyspark DataFrame 进行采样,每个样本之间距离最后一个条目的天数

Sample Pyspark DataFrame by date with number of days from last entry between each sample

给定一个 DataFrame:

import datetime
from pyspark.sql import Row

dataframe_rows = {
    Row(id = "A", date = datetime.datetime(2015, 1, 18)),
    Row(id = "A", date = datetime.datetime(2015, 2, 21)),
    Row(id = "A", date = datetime.datetime(2015, 2, 22)),
    Row(id = "A", date = datetime.datetime(2015, 6, 30)),
    Row(id = "A", date = datetime.datetime(2017, 12, 31)),
    Row(id = "B", date = datetime.datetime(2019, 1, 18)),
    Row(id = "B", date = datetime.datetime(2019, 1, 21)),
    Row(id = "B", date = datetime.datetime(2019, 2, 22)),
    Row(id = "B", date = datetime.datetime(2019, 2, 28)),
    Row(id = "B", date = datetime.datetime(2019, 12, 13)),
}

df_example = spark.createDataFrame(dataframe_rows).orderBy(["id", "date"], ascending=[1, 1])

所以

df_example.show()

产量

+---+-------------------+
| id|               date|
+---+-------------------+
|  A|2015-01-18 00:00:00|
|  A|2015-02-21 00:00:00|
|  A|2015-02-22 00:00:00|
|  A|2015-06-30 00:00:00|
|  A|2017-12-31 00:00:00|
|  B|2019-01-18 00:00:00|
|  B|2019-01-21 00:00:00|
|  B|2019-02-22 00:00:00|
|  B|2019-02-28 00:00:00|
|  B|2019-12-13 00:00:00|
+---+-------------------+

我想要一个函数,该函数将从该 DataFrame 中采样行,以便每个样本之间有指定的天数,并且每个 id 的最后日期是采样 DataFrame 中每个 id 的最后日期。

例如,每个样本之间使用 14 天,

+---+-------------------+
| id|               date|
+---+-------------------+
|  A|2015-01-18 00:00:00|
|  A|2015-02-22 00:00:00|
|  A|2015-06-30 00:00:00|
|  A|2017-12-31 00:00:00|
|  B|2019-01-18 00:00:00|
|  B|2019-02-28 00:00:00|
|  B|2019-12-13 00:00:00|
+---+-------------------+

注意每个 id 的最后日期与原始 DataFrame 中的最后日期相同。

编辑:下面的解决方案适用于我提供的原始 DataFrame,但如果我更改它

from pyspark.sql import Row

dataframe_rows = {
    Row(id = "A", date = datetime.datetime(2000, 11, 12)),
    Row(id = "A", date = datetime.datetime(2000, 12, 13)),
    Row(id = "A", date = datetime.datetime(2000, 12, 29)),
    Row(id = "A", date = datetime.datetime(2000, 12, 30)),
    Row(id = "A", date = datetime.datetime(2000, 12, 31)),
    Row(id = "B", date = datetime.datetime(2002, 2, 18)),
    Row(id = "B", date = datetime.datetime(2002, 2, 21)),
    Row(id = "B", date = datetime.datetime(2002, 2, 27)),
    Row(id = "B", date = datetime.datetime(2002, 2, 28)),
    Row(id = "B", date = datetime.datetime(2002, 12, 13)),
}

df_example = spark.createDataFrame(dataframe_rows).orderBy(["id", "date"], ascending=[1, 1])
df_example.show()

屈服

+---+-------------------+
| id|               date|
+---+-------------------+
|  A|2000-11-12 00:00:00|
|  A|2000-12-13 00:00:00|
|  A|2000-12-29 00:00:00|
|  A|2000-12-30 00:00:00|
|  A|2000-12-31 00:00:00|
|  B|2002-02-18 00:00:00|
|  B|2002-02-21 00:00:00|
|  B|2002-02-27 00:00:00|
|  B|2002-02-28 00:00:00|
|  B|2002-12-13 00:00:00|
+---+-------------------+

并应用我得到的代码

+---+----------+
| id|      date|
+---+----------+
|  A|2000-11-12|
|  A|2000-12-13|
|  A|2000-12-31|
|  B|2002-02-27|
|  B|2002-02-28|
|  B|2002-12-13|
+---+----------+

我不确定为什么 2 月的两个日期都存在。我希望看到

+---+----------+
| id|      date|
+---+----------+
|  A|2000-11-12|
|  A|2000-12-13|
|  A|2000-12-31|
|  B|2002-02-28|
|  B|2002-12-13|
+---+----------+

有什么想法吗?

pyspark 中没有直接的时间戳重采样功能。但是,我从 this blogpost 中找到了一个帮助函数来解决这个问题。该函数将时间戳转换为 unix 时间戳,并根据您指定的重采样间隔聚合数据。

unix timestamp is a representation of timestamp in seconds starting from 1 January 1970. For example, 1 January 1970 00:00:00 = 0 seconds, 1 January 1970 01:00:00 = 3600 seconds, 2 January 1970 00:00:00 = 86400 seconds

使用该函数制作重采样列后,您可以继续使用 pyspark 中的 F.last() 进行 .groupBy() 和聚合。

重采样函数

编辑(2021 年 5 月 19 日):添加了偏移量

import pyspark.sql.functions as F

def resample(column, agg_interval=900, offset=0, time_format='yyyy-MM-dd HH:mm:ss'):
    if type(column)==str:
        column = F.col(column)

    # Convert the timestamp to unix timestamp format.
    # Unix timestamp = number of seconds since 00:00:00 UTC, 1 January 1970.
    col_ut =  F.unix_timestamp(column, format=time_format)

    # Divide the time into dicrete intervals, by rounding. 
    col_ut_agg =  F.floor( (col_ut + offset) / agg_interval) * agg_interval 

    # Convert to and return a human readable timestamp
    return F.from_unixtime(col_ut_agg)

无偏移重采样

# 14 days = 60 seconds * 60 minutes * 24 hours * 14 days
df_example\
    .withColumn('date_resampled', resample(df_example.date, 60*60*24*14))\
    .groupBy('date_resampled')\
    .agg(F.last('id').alias('id'), F.last('date').alias('date'))\
    .orderBy(['id', 'date'])\
    .show()

输出:

+-------------------+---+-------------------+
|     date_resampled| id|               date|
+-------------------+---+-------------------+
|2000-11-09 00:00:00|  A|2000-11-12 00:00:00|
|2000-12-07 00:00:00|  A|2000-12-13 00:00:00|
|2000-12-21 00:00:00|  A|2000-12-31 00:00:00|
|2002-02-14 00:00:00|  B|2002-02-27 00:00:00|
|2002-02-28 00:00:00|  B|2002-02-28 00:00:00|
|2002-12-05 00:00:00|  B|2002-12-13 00:00:00|
+-------------------+---+-------------------+

偏移重采样

如果在没有偏移的情况下进行,重采样将从 1970 年 1 月 1 日 00:00:00 GMT 开始。偏移 1 天后,重采样将从 1970 年 1 月 2 日 00:00:00 GMT 开始。

# 14 days = 60 seconds * 60 minutes * 24 hours * 14 days
# offset = 10 days
df_example\
    .withColumn('date_resampled', resample(df_example.date, 60*60*24*14, 60*60*24*10))\
    .groupBy('date_resampled')\
    .agg(F.last('id').alias('id'), F.last('date').alias('date'))\
    .orderBy(['id', 'date'])\
    .show()

输出:

+-------------------+---+-------------------+
|     date_resampled| id|               date|
+-------------------+---+-------------------+
|2000-11-09 00:00:00|  A|2000-11-12 00:00:00|
|2000-12-21 00:00:00|  A|2000-12-13 00:00:00|
|2001-01-04 00:00:00|  A|2000-12-31 00:00:00|
|2002-02-28 00:00:00|  B|2002-02-28 00:00:00|
|2002-12-19 00:00:00|  B|2002-12-13 00:00:00|
+-------------------+---+-------------------+