按日期对 Pyspark DataFrame 进行采样,每个样本之间距离最后一个条目的天数
Sample Pyspark DataFrame by date with number of days from last entry between each sample
给定一个 DataFrame:
import datetime
from pyspark.sql import Row
dataframe_rows = {
Row(id = "A", date = datetime.datetime(2015, 1, 18)),
Row(id = "A", date = datetime.datetime(2015, 2, 21)),
Row(id = "A", date = datetime.datetime(2015, 2, 22)),
Row(id = "A", date = datetime.datetime(2015, 6, 30)),
Row(id = "A", date = datetime.datetime(2017, 12, 31)),
Row(id = "B", date = datetime.datetime(2019, 1, 18)),
Row(id = "B", date = datetime.datetime(2019, 1, 21)),
Row(id = "B", date = datetime.datetime(2019, 2, 22)),
Row(id = "B", date = datetime.datetime(2019, 2, 28)),
Row(id = "B", date = datetime.datetime(2019, 12, 13)),
}
df_example = spark.createDataFrame(dataframe_rows).orderBy(["id", "date"], ascending=[1, 1])
所以
df_example.show()
产量
+---+-------------------+
| id| date|
+---+-------------------+
| A|2015-01-18 00:00:00|
| A|2015-02-21 00:00:00|
| A|2015-02-22 00:00:00|
| A|2015-06-30 00:00:00|
| A|2017-12-31 00:00:00|
| B|2019-01-18 00:00:00|
| B|2019-01-21 00:00:00|
| B|2019-02-22 00:00:00|
| B|2019-02-28 00:00:00|
| B|2019-12-13 00:00:00|
+---+-------------------+
我想要一个函数,该函数将从该 DataFrame 中采样行,以便每个样本之间有指定的天数,并且每个 id 的最后日期是采样 DataFrame 中每个 id 的最后日期。
例如,每个样本之间使用 14 天,
+---+-------------------+
| id| date|
+---+-------------------+
| A|2015-01-18 00:00:00|
| A|2015-02-22 00:00:00|
| A|2015-06-30 00:00:00|
| A|2017-12-31 00:00:00|
| B|2019-01-18 00:00:00|
| B|2019-02-28 00:00:00|
| B|2019-12-13 00:00:00|
+---+-------------------+
注意每个 id 的最后日期与原始 DataFrame 中的最后日期相同。
编辑:下面的解决方案适用于我提供的原始 DataFrame,但如果我更改它
from pyspark.sql import Row
dataframe_rows = {
Row(id = "A", date = datetime.datetime(2000, 11, 12)),
Row(id = "A", date = datetime.datetime(2000, 12, 13)),
Row(id = "A", date = datetime.datetime(2000, 12, 29)),
Row(id = "A", date = datetime.datetime(2000, 12, 30)),
Row(id = "A", date = datetime.datetime(2000, 12, 31)),
Row(id = "B", date = datetime.datetime(2002, 2, 18)),
Row(id = "B", date = datetime.datetime(2002, 2, 21)),
Row(id = "B", date = datetime.datetime(2002, 2, 27)),
Row(id = "B", date = datetime.datetime(2002, 2, 28)),
Row(id = "B", date = datetime.datetime(2002, 12, 13)),
}
df_example = spark.createDataFrame(dataframe_rows).orderBy(["id", "date"], ascending=[1, 1])
df_example.show()
屈服
+---+-------------------+
| id| date|
+---+-------------------+
| A|2000-11-12 00:00:00|
| A|2000-12-13 00:00:00|
| A|2000-12-29 00:00:00|
| A|2000-12-30 00:00:00|
| A|2000-12-31 00:00:00|
| B|2002-02-18 00:00:00|
| B|2002-02-21 00:00:00|
| B|2002-02-27 00:00:00|
| B|2002-02-28 00:00:00|
| B|2002-12-13 00:00:00|
+---+-------------------+
并应用我得到的代码
+---+----------+
| id| date|
+---+----------+
| A|2000-11-12|
| A|2000-12-13|
| A|2000-12-31|
| B|2002-02-27|
| B|2002-02-28|
| B|2002-12-13|
+---+----------+
我不确定为什么 2 月的两个日期都存在。我希望看到
+---+----------+
| id| date|
+---+----------+
| A|2000-11-12|
| A|2000-12-13|
| A|2000-12-31|
| B|2002-02-28|
| B|2002-12-13|
+---+----------+
有什么想法吗?
pyspark 中没有直接的时间戳重采样功能。但是,我从 this blogpost 中找到了一个帮助函数来解决这个问题。该函数将时间戳转换为 unix 时间戳,并根据您指定的重采样间隔聚合数据。
unix timestamp is a representation of timestamp in seconds starting from 1 January 1970. For example, 1 January 1970 00:00:00 = 0 seconds, 1 January 1970 01:00:00 = 3600 seconds, 2 January 1970 00:00:00 = 86400 seconds
使用该函数制作重采样列后,您可以继续使用 pyspark 中的 F.last()
进行 .groupBy()
和聚合。
重采样函数
编辑(2021 年 5 月 19 日):添加了偏移量
import pyspark.sql.functions as F
def resample(column, agg_interval=900, offset=0, time_format='yyyy-MM-dd HH:mm:ss'):
if type(column)==str:
column = F.col(column)
# Convert the timestamp to unix timestamp format.
# Unix timestamp = number of seconds since 00:00:00 UTC, 1 January 1970.
col_ut = F.unix_timestamp(column, format=time_format)
# Divide the time into dicrete intervals, by rounding.
col_ut_agg = F.floor( (col_ut + offset) / agg_interval) * agg_interval
# Convert to and return a human readable timestamp
return F.from_unixtime(col_ut_agg)
无偏移重采样
# 14 days = 60 seconds * 60 minutes * 24 hours * 14 days
df_example\
.withColumn('date_resampled', resample(df_example.date, 60*60*24*14))\
.groupBy('date_resampled')\
.agg(F.last('id').alias('id'), F.last('date').alias('date'))\
.orderBy(['id', 'date'])\
.show()
输出:
+-------------------+---+-------------------+
| date_resampled| id| date|
+-------------------+---+-------------------+
|2000-11-09 00:00:00| A|2000-11-12 00:00:00|
|2000-12-07 00:00:00| A|2000-12-13 00:00:00|
|2000-12-21 00:00:00| A|2000-12-31 00:00:00|
|2002-02-14 00:00:00| B|2002-02-27 00:00:00|
|2002-02-28 00:00:00| B|2002-02-28 00:00:00|
|2002-12-05 00:00:00| B|2002-12-13 00:00:00|
+-------------------+---+-------------------+
偏移重采样
如果在没有偏移的情况下进行,重采样将从 1970 年 1 月 1 日 00:00:00 GMT 开始。偏移 1 天后,重采样将从 1970 年 1 月 2 日 00:00:00 GMT 开始。
# 14 days = 60 seconds * 60 minutes * 24 hours * 14 days
# offset = 10 days
df_example\
.withColumn('date_resampled', resample(df_example.date, 60*60*24*14, 60*60*24*10))\
.groupBy('date_resampled')\
.agg(F.last('id').alias('id'), F.last('date').alias('date'))\
.orderBy(['id', 'date'])\
.show()
输出:
+-------------------+---+-------------------+
| date_resampled| id| date|
+-------------------+---+-------------------+
|2000-11-09 00:00:00| A|2000-11-12 00:00:00|
|2000-12-21 00:00:00| A|2000-12-13 00:00:00|
|2001-01-04 00:00:00| A|2000-12-31 00:00:00|
|2002-02-28 00:00:00| B|2002-02-28 00:00:00|
|2002-12-19 00:00:00| B|2002-12-13 00:00:00|
+-------------------+---+-------------------+
给定一个 DataFrame:
import datetime
from pyspark.sql import Row
dataframe_rows = {
Row(id = "A", date = datetime.datetime(2015, 1, 18)),
Row(id = "A", date = datetime.datetime(2015, 2, 21)),
Row(id = "A", date = datetime.datetime(2015, 2, 22)),
Row(id = "A", date = datetime.datetime(2015, 6, 30)),
Row(id = "A", date = datetime.datetime(2017, 12, 31)),
Row(id = "B", date = datetime.datetime(2019, 1, 18)),
Row(id = "B", date = datetime.datetime(2019, 1, 21)),
Row(id = "B", date = datetime.datetime(2019, 2, 22)),
Row(id = "B", date = datetime.datetime(2019, 2, 28)),
Row(id = "B", date = datetime.datetime(2019, 12, 13)),
}
df_example = spark.createDataFrame(dataframe_rows).orderBy(["id", "date"], ascending=[1, 1])
所以
df_example.show()
产量
+---+-------------------+
| id| date|
+---+-------------------+
| A|2015-01-18 00:00:00|
| A|2015-02-21 00:00:00|
| A|2015-02-22 00:00:00|
| A|2015-06-30 00:00:00|
| A|2017-12-31 00:00:00|
| B|2019-01-18 00:00:00|
| B|2019-01-21 00:00:00|
| B|2019-02-22 00:00:00|
| B|2019-02-28 00:00:00|
| B|2019-12-13 00:00:00|
+---+-------------------+
我想要一个函数,该函数将从该 DataFrame 中采样行,以便每个样本之间有指定的天数,并且每个 id 的最后日期是采样 DataFrame 中每个 id 的最后日期。
例如,每个样本之间使用 14 天,
+---+-------------------+
| id| date|
+---+-------------------+
| A|2015-01-18 00:00:00|
| A|2015-02-22 00:00:00|
| A|2015-06-30 00:00:00|
| A|2017-12-31 00:00:00|
| B|2019-01-18 00:00:00|
| B|2019-02-28 00:00:00|
| B|2019-12-13 00:00:00|
+---+-------------------+
注意每个 id 的最后日期与原始 DataFrame 中的最后日期相同。
编辑:下面的解决方案适用于我提供的原始 DataFrame,但如果我更改它
from pyspark.sql import Row
dataframe_rows = {
Row(id = "A", date = datetime.datetime(2000, 11, 12)),
Row(id = "A", date = datetime.datetime(2000, 12, 13)),
Row(id = "A", date = datetime.datetime(2000, 12, 29)),
Row(id = "A", date = datetime.datetime(2000, 12, 30)),
Row(id = "A", date = datetime.datetime(2000, 12, 31)),
Row(id = "B", date = datetime.datetime(2002, 2, 18)),
Row(id = "B", date = datetime.datetime(2002, 2, 21)),
Row(id = "B", date = datetime.datetime(2002, 2, 27)),
Row(id = "B", date = datetime.datetime(2002, 2, 28)),
Row(id = "B", date = datetime.datetime(2002, 12, 13)),
}
df_example = spark.createDataFrame(dataframe_rows).orderBy(["id", "date"], ascending=[1, 1])
df_example.show()
屈服
+---+-------------------+
| id| date|
+---+-------------------+
| A|2000-11-12 00:00:00|
| A|2000-12-13 00:00:00|
| A|2000-12-29 00:00:00|
| A|2000-12-30 00:00:00|
| A|2000-12-31 00:00:00|
| B|2002-02-18 00:00:00|
| B|2002-02-21 00:00:00|
| B|2002-02-27 00:00:00|
| B|2002-02-28 00:00:00|
| B|2002-12-13 00:00:00|
+---+-------------------+
并应用我得到的代码
+---+----------+
| id| date|
+---+----------+
| A|2000-11-12|
| A|2000-12-13|
| A|2000-12-31|
| B|2002-02-27|
| B|2002-02-28|
| B|2002-12-13|
+---+----------+
我不确定为什么 2 月的两个日期都存在。我希望看到
+---+----------+
| id| date|
+---+----------+
| A|2000-11-12|
| A|2000-12-13|
| A|2000-12-31|
| B|2002-02-28|
| B|2002-12-13|
+---+----------+
有什么想法吗?
pyspark 中没有直接的时间戳重采样功能。但是,我从 this blogpost 中找到了一个帮助函数来解决这个问题。该函数将时间戳转换为 unix 时间戳,并根据您指定的重采样间隔聚合数据。
unix timestamp is a representation of timestamp in seconds starting from 1 January 1970. For example, 1 January 1970 00:00:00 = 0 seconds, 1 January 1970 01:00:00 = 3600 seconds, 2 January 1970 00:00:00 = 86400 seconds
使用该函数制作重采样列后,您可以继续使用 pyspark 中的 F.last()
进行 .groupBy()
和聚合。
重采样函数
编辑(2021 年 5 月 19 日):添加了偏移量
import pyspark.sql.functions as F
def resample(column, agg_interval=900, offset=0, time_format='yyyy-MM-dd HH:mm:ss'):
if type(column)==str:
column = F.col(column)
# Convert the timestamp to unix timestamp format.
# Unix timestamp = number of seconds since 00:00:00 UTC, 1 January 1970.
col_ut = F.unix_timestamp(column, format=time_format)
# Divide the time into dicrete intervals, by rounding.
col_ut_agg = F.floor( (col_ut + offset) / agg_interval) * agg_interval
# Convert to and return a human readable timestamp
return F.from_unixtime(col_ut_agg)
无偏移重采样
# 14 days = 60 seconds * 60 minutes * 24 hours * 14 days
df_example\
.withColumn('date_resampled', resample(df_example.date, 60*60*24*14))\
.groupBy('date_resampled')\
.agg(F.last('id').alias('id'), F.last('date').alias('date'))\
.orderBy(['id', 'date'])\
.show()
输出:
+-------------------+---+-------------------+
| date_resampled| id| date|
+-------------------+---+-------------------+
|2000-11-09 00:00:00| A|2000-11-12 00:00:00|
|2000-12-07 00:00:00| A|2000-12-13 00:00:00|
|2000-12-21 00:00:00| A|2000-12-31 00:00:00|
|2002-02-14 00:00:00| B|2002-02-27 00:00:00|
|2002-02-28 00:00:00| B|2002-02-28 00:00:00|
|2002-12-05 00:00:00| B|2002-12-13 00:00:00|
+-------------------+---+-------------------+
偏移重采样
如果在没有偏移的情况下进行,重采样将从 1970 年 1 月 1 日 00:00:00 GMT 开始。偏移 1 天后,重采样将从 1970 年 1 月 2 日 00:00:00 GMT 开始。
# 14 days = 60 seconds * 60 minutes * 24 hours * 14 days
# offset = 10 days
df_example\
.withColumn('date_resampled', resample(df_example.date, 60*60*24*14, 60*60*24*10))\
.groupBy('date_resampled')\
.agg(F.last('id').alias('id'), F.last('date').alias('date'))\
.orderBy(['id', 'date'])\
.show()
输出:
+-------------------+---+-------------------+
| date_resampled| id| date|
+-------------------+---+-------------------+
|2000-11-09 00:00:00| A|2000-11-12 00:00:00|
|2000-12-21 00:00:00| A|2000-12-13 00:00:00|
|2001-01-04 00:00:00| A|2000-12-31 00:00:00|
|2002-02-28 00:00:00| B|2002-02-28 00:00:00|
|2002-12-19 00:00:00| B|2002-12-13 00:00:00|
+-------------------+---+-------------------+