PySpark:如何创建自上次事件以来的时间计数器和基于事件的唯一标识符?
PySpark: How to create a time since last event counter and unique identifiers based on event?
我有一个看起来像这样的数据框。要在标准 pandas 中执行所需的操作,我将执行以下操作:
import pandas as pd
case = pd.Series(['A', 'A', 'A', 'A',
'B', 'B', 'B', 'B',
'C', 'C', 'C', 'C'])
y = pd.Series([0, 1, 0, 0,
0, 1, 0, 0,
0, 0, 1, 0])
year = [2016, 2017, 2018, 2019,
2016, 2017, 2018, 2019,
2016, 2017, 2018, 2019]
dict = {'case': case, 'y': y, 'year': year}
df = pd.DataFrame(dict)
# the transformations of interest
df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()
我正在寻求有关如何将这两个命令转换为 PySpark 数据帧的帮助。
df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()
预期输出如下:
case y year case_id counter
A 0 2016 1 0
A 1 2017 1 1
A 0 2018 2 0
A 0 2019 2 1
B 0 2016 3 0
B 1 2017 3 1
B 0 2018 4 0
B 0 2019 4 1
C 0 2016 5 0
C 0 2017 5 1
C 1 2018 5 2
C 0 2019 6 0
这几乎就像一个常见问题解答,另请参阅我的旧 中的另一个示例。对于此示例,您可以尝试以下操作:
from pyspark.sql import functions as F
from pyspark.sql import Window
pdf = spark.createDataFrame(df)
w1 = Window.partitionBy().orderBy('case', 'year')
w2 = Window.partitionBy('case_id').orderBy('case', 'year')
df_new = pdf.withColumn("case_id", F.sum(F.when(~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1),1).otherwise(0)).over(w1)+1) \
.withColumn('counter', F.count('*').over(w2)-1)
df_new.show()
+----+---+----+-------+-------+
|case| y|year|case_id|counter|
+----+---+----+-------+-------+
| A| 0|2016| 1| 0|
| A| 1|2017| 1| 1|
| A| 0|2018| 2| 0|
| A| 0|2019| 2| 1|
| B| 0|2016| 3| 0|
| B| 1|2017| 3| 1|
| B| 0|2018| 4| 0|
| B| 0|2019| 4| 1|
| C| 0|2016| 5| 0|
| C| 0|2017| 5| 1|
| C| 1|2018| 5| 2|
| C| 0|2019| 6| 0|
+----+---+----+-------+-------+
其中:
设置 WindSpec w1
以按 case
、year
对行进行排序,然后使用 lag 函数查找前一个值(类似于在 pandas) 中移动 。
pandas: (~(df.case == df.case.shift())) | (df.y.shift()==1)
pyspark: ~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1)
注:
(1) w1
中的 orderBy 很重要,因为 partitionBy 会触发数据混洗,否则无法保证结果行的顺序。 (2) 注意 null 值使用 lag 函数,如果需要,使用 lag 函数的第三个参数或 coalesce 函数设置默认值。
用F.when(..,1).otherwise(0)
把(1)的结果从boolean转换成int然后做 cumsum
:
pandas: df.c.cumsum()
pyspark: F.sum(c).over(w1)+1
add case_id into partitionBy 设置 w2
然后做 cumcount
(不需要再做cumsum
然后 groupby
):
pandas: df.groupby(..).cumcount()
pyspark: F.count('*').over(w2)-1
对于大型数据帧,设置没有 partitionBy
的 WinSpec 会将所有数据移动到一个分区中,这可能会产生 OOM 错误。事实上,如果你只是在 case + case_id 的每个组合中寻找 cumcount
,你更有可能执行以下操作:
w1 = Window.partitionBy('case').orderBy('year')
w2 = Window.partitionBy('case', 'case_id').orderBy('year')
df_new = pdf.withColumn("case_id", F.sum(F.when(F.lag("y",1,0).over(w1) == 1,1).otherwise(0)).over(w1)) \
.withColumn('counter', F.count('*').over(w2)-1)
df_new.show()
+----+---+----+-------+-------+
|case| y|year|case_id|counter|
+----+---+----+-------+-------+
| B| 0|2016| 0| 0|
| B| 1|2017| 0| 1|
| B| 0|2018| 1| 0|
| B| 0|2019| 1| 1|
| C| 0|2016| 0| 0|
| C| 0|2017| 0| 1|
| C| 1|2018| 0| 2|
| C| 0|2019| 1| 0|
| A| 0|2016| 0| 0|
| A| 1|2017| 0| 1|
| A| 0|2018| 1| 0|
| A| 0|2019| 1| 1|
+----+---+----+-------+-------+
我有一个看起来像这样的数据框。要在标准 pandas 中执行所需的操作,我将执行以下操作:
import pandas as pd
case = pd.Series(['A', 'A', 'A', 'A',
'B', 'B', 'B', 'B',
'C', 'C', 'C', 'C'])
y = pd.Series([0, 1, 0, 0,
0, 1, 0, 0,
0, 0, 1, 0])
year = [2016, 2017, 2018, 2019,
2016, 2017, 2018, 2019,
2016, 2017, 2018, 2019]
dict = {'case': case, 'y': y, 'year': year}
df = pd.DataFrame(dict)
# the transformations of interest
df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()
我正在寻求有关如何将这两个命令转换为 PySpark 数据帧的帮助。
df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()
预期输出如下:
case y year case_id counter
A 0 2016 1 0
A 1 2017 1 1
A 0 2018 2 0
A 0 2019 2 1
B 0 2016 3 0
B 1 2017 3 1
B 0 2018 4 0
B 0 2019 4 1
C 0 2016 5 0
C 0 2017 5 1
C 1 2018 5 2
C 0 2019 6 0
这几乎就像一个常见问题解答,另请参阅我的旧
from pyspark.sql import functions as F
from pyspark.sql import Window
pdf = spark.createDataFrame(df)
w1 = Window.partitionBy().orderBy('case', 'year')
w2 = Window.partitionBy('case_id').orderBy('case', 'year')
df_new = pdf.withColumn("case_id", F.sum(F.when(~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1),1).otherwise(0)).over(w1)+1) \
.withColumn('counter', F.count('*').over(w2)-1)
df_new.show()
+----+---+----+-------+-------+
|case| y|year|case_id|counter|
+----+---+----+-------+-------+
| A| 0|2016| 1| 0|
| A| 1|2017| 1| 1|
| A| 0|2018| 2| 0|
| A| 0|2019| 2| 1|
| B| 0|2016| 3| 0|
| B| 1|2017| 3| 1|
| B| 0|2018| 4| 0|
| B| 0|2019| 4| 1|
| C| 0|2016| 5| 0|
| C| 0|2017| 5| 1|
| C| 1|2018| 5| 2|
| C| 0|2019| 6| 0|
+----+---+----+-------+-------+
其中:
设置 WindSpec
w1
以按case
、year
对行进行排序,然后使用 lag 函数查找前一个值(类似于在 pandas) 中移动 。pandas: (~(df.case == df.case.shift())) | (df.y.shift()==1) pyspark: ~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1)
注: (1)
w1
中的 orderBy 很重要,因为 partitionBy 会触发数据混洗,否则无法保证结果行的顺序。 (2) 注意 null 值使用 lag 函数,如果需要,使用 lag 函数的第三个参数或 coalesce 函数设置默认值。用
F.when(..,1).otherwise(0)
把(1)的结果从boolean转换成int然后做cumsum
:pandas: df.c.cumsum() pyspark: F.sum(c).over(w1)+1
add case_id into partitionBy 设置
w2
然后做cumcount
(不需要再做cumsum
然后groupby
):pandas: df.groupby(..).cumcount() pyspark: F.count('*').over(w2)-1
对于大型数据帧,设置没有 partitionBy
的 WinSpec 会将所有数据移动到一个分区中,这可能会产生 OOM 错误。事实上,如果你只是在 case + case_id 的每个组合中寻找 cumcount
,你更有可能执行以下操作:
w1 = Window.partitionBy('case').orderBy('year')
w2 = Window.partitionBy('case', 'case_id').orderBy('year')
df_new = pdf.withColumn("case_id", F.sum(F.when(F.lag("y",1,0).over(w1) == 1,1).otherwise(0)).over(w1)) \
.withColumn('counter', F.count('*').over(w2)-1)
df_new.show()
+----+---+----+-------+-------+
|case| y|year|case_id|counter|
+----+---+----+-------+-------+
| B| 0|2016| 0| 0|
| B| 1|2017| 0| 1|
| B| 0|2018| 1| 0|
| B| 0|2019| 1| 1|
| C| 0|2016| 0| 0|
| C| 0|2017| 0| 1|
| C| 1|2018| 0| 2|
| C| 0|2019| 1| 0|
| A| 0|2016| 0| 0|
| A| 1|2017| 0| 1|
| A| 0|2018| 1| 0|
| A| 0|2019| 1| 1|
+----+---+----+-------+-------+