在给定时间 window 在 Pyspark 中查找行数

FInd number of rows in given time window in Pyspark

我有一个 PySpark 数据框,下面给出了其中的一小部分:

+------+-----+-------------------+-----+
|  name| type|          timestamp|score|
+------+-----+-------------------+-----+
| name1|type1|2012-01-10 00:00:00|   11|
| name1|type1|2012-01-10 00:00:10|   14|
| name1|type1|2012-01-10 00:00:20|    2|
| name1|type1|2012-01-10 00:00:30|    3|
| name1|type1|2012-01-10 00:00:40|   55|
| name1|type1|2012-01-10 00:00:50|   10|
| name5|type1|2012-01-10 00:01:00|    5|
| name2|type2|2012-01-10 00:01:10|    8|
| name5|type1|2012-01-10 00:01:20|    1|
|name10|type1|2012-01-10 00:01:30|   12|
|name11|type3|2012-01-10 00:01:40|  512|
+------+-----+-------------------+-----+

对于选定的时间 window(例如,假设 5 days),我想找出 score 的值有多少(比如 num_values_week)每个 name 都有吗?也就是说,name12012-01-10 - 2012-01-14 之间有多少 score 的值,然后在 2012-01-15 - 2012-01-29 之间等等(对于所有其他名称,如 name2 等等。)

我想将此信息投射到新的 PySpark 数据框中,该数据框将包含 nametypenum_values_week 列。我该怎么做?

在我之前问过的 中,我看到了如何在选择一周的时间间隔时获得(分数)计数。但是,在这个问题中,我想知道如何在 window 时间内选择任何可调整的值(如 5 days 左右)来计算分数。

任何帮助将不胜感激。

使用pd.Grouper with DataFrame.groupy:

#df['timestamp']=pd.to_datetime(df['timestamp']) #to convert to datetime
new_df=( df.groupby([pd.Grouper(key='timestamp',freq='5D'),'name'],sort=False)
          .score
          .count()
          .rename('num_values_week')
          .reset_index() )
print(new_df)

输出

   timestamp    name  num_values_week
0 2012-01-10   name1                6
1 2012-01-10   name5                2
2 2012-01-10   name2                1
3 2012-01-10  name10                1
4 2012-01-10  name11                1

GroupBy.resample:

new_df=( df.groupby('name',sort=False)
           .resample('5D',on='timestamp')
           .count()
           .score
           .rename('num_values_week')
           .reset_index() )
print(new_df)

输出

     name  timestamp  num_values_week
0   name1 2012-01-10                6
1   name5 2012-01-10                2
2   name2 2012-01-10                1
3  name10 2012-01-10                1
4  name11 2012-01-10                1

如果您想在原始 df 中创建一个新列,请使用 transform:

df['num_values_week']=df.groupby([pd.Grouper(key='timestamp',freq='5D'),'name']).score.transform('count')
print(df)

      name   type           timestamp  score  num_values_week
0    name1  type1 2012-01-10 00:00:00     11                6
1    name1  type1 2012-01-10 00:00:10     14                6
2    name1  type1 2012-01-10 00:00:20      2                6
3    name1  type1 2012-01-10 00:00:30      3                6
4    name1  type1 2012-01-10 00:00:40     55                6
5    name1  type1 2012-01-10 00:00:50     10                6
6    name5  type1 2012-01-10 00:01:00      5                2
7    name2  type2 2012-01-10 00:01:10      8                1
8    name5  type1 2012-01-10 00:01:20      1                2
9   name10  type1 2012-01-10 00:01:30     12                1
10  name11  type3 2012-01-10 00:01:40    512                1