在 PySpark 中查找给定一周的行数
Find number of rows in a given week in PySpark
我有一个 PySpark 数据框,下面给出了其中的一小部分:
+------+-----+-------------------+-----+
| name| type| timestamp|score|
+------+-----+-------------------+-----+
| name1|type1|2012-01-10 00:00:00| 11|
| name1|type1|2012-01-10 00:00:10| 14|
| name1|type1|2012-01-10 00:00:20| 2|
| name1|type1|2012-01-10 00:00:30| 3|
| name1|type1|2012-01-10 00:00:40| 55|
| name1|type1|2012-01-10 00:00:50| 10|
| name5|type1|2012-01-10 00:01:00| 5|
| name2|type2|2012-01-10 00:01:10| 8|
| name5|type1|2012-01-10 00:01:20| 1|
|name10|type1|2012-01-10 00:01:30| 12|
|name11|type3|2012-01-10 00:01:40| 512|
+------+-----+-------------------+-----+
对于选定的时间 window(比如 1 week
的 windows),我想找出 score
的多少个值(比如 num_values_week
) 每个 name
都有。也就是说,name1
在 2012-01-10 - 2012-01-16
之间有多少 score
的值,然后在 2012-01-16 - 2012-01-23
之间等等(对于所有其他名称,如 name2
等等。)
我想将此信息投射到新的 PySpark 数据框中,该数据框将包含 name
、type
、num_values_week
列。我该怎么做?
可以使用以下代码片段创建上面给出的 PySpark 数据框:
from pyspark.sql import *
import pyspark.sql.functions as F
df_Stats = Row("name", "type", "timestamp", "score")
df_stat1 = df_Stats('name1', 'type1', "2012-01-10 00:00:00", 11)
df_stat2 = df_Stats('name2', 'type2', "2012-01-10 00:00:00", 14)
df_stat3 = df_Stats('name3', 'type3', "2012-01-10 00:00:00", 2)
df_stat4 = df_Stats('name4', 'type1', "2012-01-17 00:00:00", 3)
df_stat5 = df_Stats('name5', 'type3', "2012-01-10 00:00:00", 55)
df_stat6 = df_Stats('name2', 'type2', "2012-01-17 00:00:00", 10)
df_stat7 = df_Stats('name7', 'type3', "2012-01-24 00:00:00", 5)
df_stat8 = df_Stats('name8', 'type2', "2012-01-17 00:00:00", 8)
df_stat9 = df_Stats('name1', 'type1', "2012-01-24 00:00:00", 1)
df_stat10 = df_Stats('name10', 'type2', "2012-01-17 00:00:00", 12)
df_stat11 = df_Stats('name11', 'type3', "2012-01-24 00:00:00", 512)
df_stat_lst = [df_stat1 , df_stat2, df_stat3, df_stat4, df_stat5,
df_stat6, df_stat7, df_stat8, df_stat9, df_stat10, df_stat11]
df = spark.createDataFrame(df_stat_lst)
像这样:
from pyspark.sql.functions import weekofyear, count
df = df.withColumn( "week_nr", weekofyear(df.timestamp) ) # create the week number first
result = df.groupBy(["week_nr","name"]).agg(count("score")) # for every week see how many rows there are
我有一个 PySpark 数据框,下面给出了其中的一小部分:
+------+-----+-------------------+-----+
| name| type| timestamp|score|
+------+-----+-------------------+-----+
| name1|type1|2012-01-10 00:00:00| 11|
| name1|type1|2012-01-10 00:00:10| 14|
| name1|type1|2012-01-10 00:00:20| 2|
| name1|type1|2012-01-10 00:00:30| 3|
| name1|type1|2012-01-10 00:00:40| 55|
| name1|type1|2012-01-10 00:00:50| 10|
| name5|type1|2012-01-10 00:01:00| 5|
| name2|type2|2012-01-10 00:01:10| 8|
| name5|type1|2012-01-10 00:01:20| 1|
|name10|type1|2012-01-10 00:01:30| 12|
|name11|type3|2012-01-10 00:01:40| 512|
+------+-----+-------------------+-----+
对于选定的时间 window(比如 1 week
的 windows),我想找出 score
的多少个值(比如 num_values_week
) 每个 name
都有。也就是说,name1
在 2012-01-10 - 2012-01-16
之间有多少 score
的值,然后在 2012-01-16 - 2012-01-23
之间等等(对于所有其他名称,如 name2
等等。)
我想将此信息投射到新的 PySpark 数据框中,该数据框将包含 name
、type
、num_values_week
列。我该怎么做?
可以使用以下代码片段创建上面给出的 PySpark 数据框:
from pyspark.sql import *
import pyspark.sql.functions as F
df_Stats = Row("name", "type", "timestamp", "score")
df_stat1 = df_Stats('name1', 'type1', "2012-01-10 00:00:00", 11)
df_stat2 = df_Stats('name2', 'type2', "2012-01-10 00:00:00", 14)
df_stat3 = df_Stats('name3', 'type3', "2012-01-10 00:00:00", 2)
df_stat4 = df_Stats('name4', 'type1', "2012-01-17 00:00:00", 3)
df_stat5 = df_Stats('name5', 'type3', "2012-01-10 00:00:00", 55)
df_stat6 = df_Stats('name2', 'type2', "2012-01-17 00:00:00", 10)
df_stat7 = df_Stats('name7', 'type3', "2012-01-24 00:00:00", 5)
df_stat8 = df_Stats('name8', 'type2', "2012-01-17 00:00:00", 8)
df_stat9 = df_Stats('name1', 'type1', "2012-01-24 00:00:00", 1)
df_stat10 = df_Stats('name10', 'type2', "2012-01-17 00:00:00", 12)
df_stat11 = df_Stats('name11', 'type3', "2012-01-24 00:00:00", 512)
df_stat_lst = [df_stat1 , df_stat2, df_stat3, df_stat4, df_stat5,
df_stat6, df_stat7, df_stat8, df_stat9, df_stat10, df_stat11]
df = spark.createDataFrame(df_stat_lst)
像这样:
from pyspark.sql.functions import weekofyear, count
df = df.withColumn( "week_nr", weekofyear(df.timestamp) ) # create the week number first
result = df.groupBy(["week_nr","name"]).agg(count("score")) # for every week see how many rows there are