PySpark:如何创建自上次事件以来的时间计数器和基于事件的唯一标识符?

PySpark: How to create a time since last event counter and unique identifiers based on event?

我有一个看起来像这样的数据框。要在标准 pandas 中执行所需的操作,我将执行以下操作:

import pandas as pd
case = pd.Series(['A', 'A', 'A', 'A',
                  'B', 'B', 'B', 'B',
                  'C', 'C', 'C', 'C'])
y = pd.Series([0, 1, 0, 0,
               0, 1, 0, 0,
               0, 0, 1, 0])
year = [2016, 2017, 2018, 2019,
        2016, 2017, 2018, 2019,
        2016, 2017, 2018, 2019]

dict = {'case': case, 'y': y, 'year': year}
df = pd.DataFrame(dict)

# the transformations of interest
df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()

我正在寻求有关如何将这两个命令转换为 PySpark 数据帧的帮助。

df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()

预期输出如下:


  case  y   year    case_id counter
    A   0   2016    1        0
    A   1   2017    1        1
    A   0   2018    2        0
    A   0   2019    2        1
    B   0   2016    3        0
    B   1   2017    3        1
    B   0   2018    4        0
    B   0   2019    4        1
    C   0   2016    5        0
    C   0   2017    5        1
    C   1   2018    5        2
    C   0   2019    6        0

这几乎就像一个常见问题解答,另请参阅我的旧 中的另一个示例。对于此示例,您可以尝试以下操作:

from pyspark.sql import functions as F
from pyspark.sql import Window

pdf = spark.createDataFrame(df)

w1 = Window.partitionBy().orderBy('case', 'year')
w2 = Window.partitionBy('case_id').orderBy('case', 'year')

df_new = pdf.withColumn("case_id", F.sum(F.when(~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1),1).otherwise(0)).over(w1)+1) \
    .withColumn('counter', F.count('*').over(w2)-1)

df_new.show()
+----+---+----+-------+-------+
|case|  y|year|case_id|counter|
+----+---+----+-------+-------+
|   A|  0|2016|      1|      0|
|   A|  1|2017|      1|      1|
|   A|  0|2018|      2|      0|
|   A|  0|2019|      2|      1|
|   B|  0|2016|      3|      0|
|   B|  1|2017|      3|      1|
|   B|  0|2018|      4|      0|
|   B|  0|2019|      4|      1|
|   C|  0|2016|      5|      0|
|   C|  0|2017|      5|      1|
|   C|  1|2018|      5|      2|
|   C|  0|2019|      6|      0|
+----+---+----+-------+-------+

其中:

  1. 设置 WindSpec w1 以按 caseyear 对行进行排序,然后使用 lag 函数查找前一个值(类似于在 pandas) 中移动

    pandas: (~(df.case == df.case.shift())) | (df.y.shift()==1) 
    pyspark: ~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1)
    

    注: (1) w1 中的 orderBy 很重要,因为 partitionBy 会触发数据混洗,否则无法保证结果行的顺序。 (2) 注意 null 值使用 lag 函数,如果需要,使用 lag 函数的第三个参数或 coalesce 函数设置默认值。

  2. F.when(..,1).otherwise(0)把(1)的结果从boolean转换成int然后做 cumsum:

    pandas: df.c.cumsum()
    pyspark: F.sum(c).over(w1)+1
    
  3. add case_id into partitionBy 设置 w2 然后做 cumcount (不需要再做cumsum 然后 groupby):

    pandas: df.groupby(..).cumcount()
    pyspark: F.count('*').over(w2)-1
    

对于大型数据帧,设置没有 partitionBy 的 WinSpec 会将所有数据移动到一个分区中,这可能会产生 OOM 错误。事实上,如果你只是在 case + case_id 的每个组合中寻找 cumcount,你更有可能执行以下操作:

w1 = Window.partitionBy('case').orderBy('year')
w2 = Window.partitionBy('case', 'case_id').orderBy('year')

df_new = pdf.withColumn("case_id", F.sum(F.when(F.lag("y",1,0).over(w1) == 1,1).otherwise(0)).over(w1)) \
    .withColumn('counter', F.count('*').over(w2)-1)

df_new.show()
+----+---+----+-------+-------+
|case|  y|year|case_id|counter|
+----+---+----+-------+-------+
|   B|  0|2016|      0|      0|
|   B|  1|2017|      0|      1|
|   B|  0|2018|      1|      0|
|   B|  0|2019|      1|      1|
|   C|  0|2016|      0|      0|
|   C|  0|2017|      0|      1|
|   C|  1|2018|      0|      2|
|   C|  0|2019|      1|      0|
|   A|  0|2016|      0|      0|
|   A|  1|2017|      0|      1|
|   A|  0|2018|      1|      0|
|   A|  0|2019|      1|      1|
+----+---+----+-------+-------+