Pyspark 中时间戳的滚动平均值和总和
Rolling average and sum by days over timestamp in Pyspark
我有一个 PySpark 数据框,其中时间戳以天为单位。以下是数据框的示例(我们称之为 df
):
+-----+-----+----------+-----+
| name| type| timestamp|score|
+-----+-----+----------+-----+
|name1|type1|2012-01-10| 11|
|name1|type1|2012-01-11| 14|
|name1|type1|2012-01-12| 2|
|name1|type3|2012-01-12| 3|
|name1|type3|2012-01-11| 55|
|name1|type1|2012-01-13| 10|
|name1|type2|2012-01-14| 11|
|name1|type2|2012-01-15| 14|
|name2|type2|2012-01-10| 2|
|name2|type2|2012-01-11| 3|
|name2|type2|2012-01-12| 55|
|name2|type1|2012-01-10| 10|
|name2|type1|2012-01-13| 55|
|name2|type1|2012-01-14| 10|
+-----+-----+----------+-----+
在此数据框中,我想取平均值,并在三天的滚动时间window 内计算不同名称的分数总和。意思是,对于数据框的任何给定日期,找到该天、所考虑日期的前一天以及所考虑日期的前一天 name1
的分数总和。并在 name1
的所有日子里做类似的事情。并且对各种 names
, viz. name2
等做同样的练习。我该怎么做?
我看了看 post,并尝试了以下
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
days = lambda i: i*1
w_rolling = Window.orderBy(F.col("timestamp").cast("long")).rangeBetween(-days(3), 0)
df_agg = df.withColumn("rolling_average", F.avg("score").over(w_rolling)).withColumn(
"rolling_sum", F.sum("score").over(w_rolling)
)
df_agg.show()
+-----+-----+----------+-----+------------------+-----------+
| name| type| timestamp|score| rolling_average|rolling_sum|
+-----+-----+----------+-----+------------------+-----------+
|name1|type1|2012-01-10| 11|18.214285714285715| 255|
|name1|type1|2012-01-11| 14|18.214285714285715| 255|
|name1|type1|2012-01-12| 2|18.214285714285715| 255|
|name1|type3|2012-01-12| 3|18.214285714285715| 255|
|name1|type3|2012-01-11| 55|18.214285714285715| 255|
|name1|type1|2012-01-13| 10|18.214285714285715| 255|
|name1|type2|2012-01-14| 11|18.214285714285715| 255|
|name1|type2|2012-01-15| 14|18.214285714285715| 255|
|name2|type2|2012-01-10| 2|18.214285714285715| 255|
|name2|type2|2012-01-11| 3|18.214285714285715| 255|
|name2|type2|2012-01-12| 55|18.214285714285715| 255|
|name2|type1|2012-01-10| 10|18.214285714285715| 255|
|name2|type1|2012-01-13| 55|18.214285714285715| 255|
|name2|type1|2012-01-14| 10|18.214285714285715| 255|
+-----+-----+----------+-----+------------------+-----------+
如您所见,我总是得到相同的滚动平均值和滚动总和,它们只不过是 score
列所有天的平均值和总和。这不是我想要的。
您可以使用以下代码片段创建上述数据框:
df_Stats = Row("name", "type", "timestamp", "score")
df_stat1 = df_Stats("name1", "type1", "2012-01-10", 11)
df_stat2 = df_Stats("name1", "type1", "2012-01-11", 14)
df_stat3 = df_Stats("name1", "type1", "2012-01-12", 2)
df_stat4 = df_Stats("name1", "type3", "2012-01-12", 3)
df_stat5 = df_Stats("name1", "type3", "2012-01-11", 55)
df_stat6 = df_Stats("name1", "type1", "2012-01-13", 10)
df_stat7 = df_Stats("name1", "type2", "2012-01-14", 11)
df_stat8 = df_Stats("name1", "type2", "2012-01-15", 14)
df_stat9 = df_Stats("name2", "type2", "2012-01-10", 2)
df_stat10 = df_Stats("name2", "type2", "2012-01-11", 3)
df_stat11 = df_Stats("name2", "type2", "2012-01-12", 55)
df_stat12 = df_Stats("name2", "type1", "2012-01-10", 10)
df_stat13 = df_Stats("name2", "type1", "2012-01-13", 55)
df_stat14 = df_Stats("name2", "type1", "2012-01-14", 10)
df_stat_lst = [
df_stat1,
df_stat2,
df_stat3,
df_stat4,
df_stat5,
df_stat6,
df_stat7,
df_stat8,
df_stat9,
df_stat10,
df_stat11,
df_stat12,
df_stat13,
df_stat14
]
df = spark.createDataFrame(df_stat_lst)
您可以使用以下代码计算过去 3 天(包括当天)的得分总和和平均值。
# Considering the dataframe already created using code provided in question
df = df.withColumn('unix_time', F.unix_timestamp('timestamp', 'yyyy-MM-dd'))
winSpec = Window.partitionBy('name').orderBy('unix_time').rangeBetween(-2*86400, 0)
df = df.withColumn('rolling_sum', F.sum('score').over(winSpec))
df = df.withColumn('rolling_avg', F.avg('score').over(winSpec))
df.orderBy('name', 'timestamp').show(20, False)
+-----+-----+----------+-----+----------+-----------+------------------+
|name |type |timestamp |score|unix_time |rolling_sum|rolling_avg |
+-----+-----+----------+-----+----------+-----------+------------------+
|name1|type1|2012-01-10|11 |1326153600|11 |11.0 |
|name1|type3|2012-01-11|55 |1326240000|80 |26.666666666666668|
|name1|type1|2012-01-11|14 |1326240000|80 |26.666666666666668|
|name1|type1|2012-01-12|2 |1326326400|85 |17.0 |
|name1|type3|2012-01-12|3 |1326326400|85 |17.0 |
|name1|type1|2012-01-13|10 |1326412800|84 |16.8 |
|name1|type2|2012-01-14|11 |1326499200|26 |6.5 |
|name1|type2|2012-01-15|14 |1326585600|35 |11.666666666666666|
|name2|type1|2012-01-10|10 |1326153600|12 |6.0 |
|name2|type2|2012-01-10|2 |1326153600|12 |6.0 |
+-----+-----+----------+-----+----------+-----------+------------------+
我有一个 PySpark 数据框,其中时间戳以天为单位。以下是数据框的示例(我们称之为 df
):
+-----+-----+----------+-----+
| name| type| timestamp|score|
+-----+-----+----------+-----+
|name1|type1|2012-01-10| 11|
|name1|type1|2012-01-11| 14|
|name1|type1|2012-01-12| 2|
|name1|type3|2012-01-12| 3|
|name1|type3|2012-01-11| 55|
|name1|type1|2012-01-13| 10|
|name1|type2|2012-01-14| 11|
|name1|type2|2012-01-15| 14|
|name2|type2|2012-01-10| 2|
|name2|type2|2012-01-11| 3|
|name2|type2|2012-01-12| 55|
|name2|type1|2012-01-10| 10|
|name2|type1|2012-01-13| 55|
|name2|type1|2012-01-14| 10|
+-----+-----+----------+-----+
在此数据框中,我想取平均值,并在三天的滚动时间window 内计算不同名称的分数总和。意思是,对于数据框的任何给定日期,找到该天、所考虑日期的前一天以及所考虑日期的前一天 name1
的分数总和。并在 name1
的所有日子里做类似的事情。并且对各种 names
, viz. name2
等做同样的练习。我该怎么做?
我看了看
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
days = lambda i: i*1
w_rolling = Window.orderBy(F.col("timestamp").cast("long")).rangeBetween(-days(3), 0)
df_agg = df.withColumn("rolling_average", F.avg("score").over(w_rolling)).withColumn(
"rolling_sum", F.sum("score").over(w_rolling)
)
df_agg.show()
+-----+-----+----------+-----+------------------+-----------+
| name| type| timestamp|score| rolling_average|rolling_sum|
+-----+-----+----------+-----+------------------+-----------+
|name1|type1|2012-01-10| 11|18.214285714285715| 255|
|name1|type1|2012-01-11| 14|18.214285714285715| 255|
|name1|type1|2012-01-12| 2|18.214285714285715| 255|
|name1|type3|2012-01-12| 3|18.214285714285715| 255|
|name1|type3|2012-01-11| 55|18.214285714285715| 255|
|name1|type1|2012-01-13| 10|18.214285714285715| 255|
|name1|type2|2012-01-14| 11|18.214285714285715| 255|
|name1|type2|2012-01-15| 14|18.214285714285715| 255|
|name2|type2|2012-01-10| 2|18.214285714285715| 255|
|name2|type2|2012-01-11| 3|18.214285714285715| 255|
|name2|type2|2012-01-12| 55|18.214285714285715| 255|
|name2|type1|2012-01-10| 10|18.214285714285715| 255|
|name2|type1|2012-01-13| 55|18.214285714285715| 255|
|name2|type1|2012-01-14| 10|18.214285714285715| 255|
+-----+-----+----------+-----+------------------+-----------+
如您所见,我总是得到相同的滚动平均值和滚动总和,它们只不过是 score
列所有天的平均值和总和。这不是我想要的。
您可以使用以下代码片段创建上述数据框:
df_Stats = Row("name", "type", "timestamp", "score")
df_stat1 = df_Stats("name1", "type1", "2012-01-10", 11)
df_stat2 = df_Stats("name1", "type1", "2012-01-11", 14)
df_stat3 = df_Stats("name1", "type1", "2012-01-12", 2)
df_stat4 = df_Stats("name1", "type3", "2012-01-12", 3)
df_stat5 = df_Stats("name1", "type3", "2012-01-11", 55)
df_stat6 = df_Stats("name1", "type1", "2012-01-13", 10)
df_stat7 = df_Stats("name1", "type2", "2012-01-14", 11)
df_stat8 = df_Stats("name1", "type2", "2012-01-15", 14)
df_stat9 = df_Stats("name2", "type2", "2012-01-10", 2)
df_stat10 = df_Stats("name2", "type2", "2012-01-11", 3)
df_stat11 = df_Stats("name2", "type2", "2012-01-12", 55)
df_stat12 = df_Stats("name2", "type1", "2012-01-10", 10)
df_stat13 = df_Stats("name2", "type1", "2012-01-13", 55)
df_stat14 = df_Stats("name2", "type1", "2012-01-14", 10)
df_stat_lst = [
df_stat1,
df_stat2,
df_stat3,
df_stat4,
df_stat5,
df_stat6,
df_stat7,
df_stat8,
df_stat9,
df_stat10,
df_stat11,
df_stat12,
df_stat13,
df_stat14
]
df = spark.createDataFrame(df_stat_lst)
您可以使用以下代码计算过去 3 天(包括当天)的得分总和和平均值。
# Considering the dataframe already created using code provided in question
df = df.withColumn('unix_time', F.unix_timestamp('timestamp', 'yyyy-MM-dd'))
winSpec = Window.partitionBy('name').orderBy('unix_time').rangeBetween(-2*86400, 0)
df = df.withColumn('rolling_sum', F.sum('score').over(winSpec))
df = df.withColumn('rolling_avg', F.avg('score').over(winSpec))
df.orderBy('name', 'timestamp').show(20, False)
+-----+-----+----------+-----+----------+-----------+------------------+
|name |type |timestamp |score|unix_time |rolling_sum|rolling_avg |
+-----+-----+----------+-----+----------+-----------+------------------+
|name1|type1|2012-01-10|11 |1326153600|11 |11.0 |
|name1|type3|2012-01-11|55 |1326240000|80 |26.666666666666668|
|name1|type1|2012-01-11|14 |1326240000|80 |26.666666666666668|
|name1|type1|2012-01-12|2 |1326326400|85 |17.0 |
|name1|type3|2012-01-12|3 |1326326400|85 |17.0 |
|name1|type1|2012-01-13|10 |1326412800|84 |16.8 |
|name1|type2|2012-01-14|11 |1326499200|26 |6.5 |
|name1|type2|2012-01-15|14 |1326585600|35 |11.666666666666666|
|name2|type1|2012-01-10|10 |1326153600|12 |6.0 |
|name2|type2|2012-01-10|2 |1326153600|12 |6.0 |
+-----+-----+----------+-----+----------+-----------+------------------+