为重复的行序列分配标识符
Assign an Identifier for repeating sequence of rows
我有一个数据框,我需要在其中生成 'CycleID' 列,如下所示:
+-------+-------+----------+---------+
| type | stage | Timestamp| CycleID |
+-------+-------+----------+---------+
| type1 | s1 | a | 1 |
| type1 | s2 | b | 1 |
| type1 | s2 | c | 1 |
| type1 | s3 | d | 1 |
| type1 | s1 | e | 2 |
| type1 | s2 | f | 2 |
| type1 | s3 | g | 2 |
| type2 | s1 | a | 1 |
| type2 | s2 | b | 1 |
| type2 | s3 | c | 1 |
+-------+-------+----------+---------+
数据约束
- 一个类型的每个周期都发生了 3 个预定阶段
按顺序。
- 循环中的各个阶段可以重复,但是
他们不能乱序发生。
例如,阶段
s1
永远不会发生在阶段 s2
.
之后
- 时间戳保证在每个阶段的行之间递增。例如:
b > a
.
目标是有一个新列“CycleID
”,用于唯一标识每种类型的循环。
到目前为止我尝试过的:
w = Window.partitionBy("type").orderBy("Timestamp")
inputdf = inputdf.withColumn("stagenum", func.expr("substring(stage, 2)")).withColumn("stagenum", col("stagenum").cast(IntegerType()))
inputdf = inputdf.withColumn("temp", func.when((col("stagenum") - func.lag("stagenum", 1).over(w)).isNull() | \
(col("stagenum") - func.lag("stagenum", 1).over(w) == func.lit(0)) |\
(col("stagenum") - func.lag("stagenum", 1).over(w) == func.lit(1)), func.lit(1)).otherwise(func.lit(100)))
除此之外,我尝试了使用 lag() 的不同方法,但似乎没有一种干净的方法来分配 CycleId。
寻求帮助。
数据
l=[('type1' , 's1' , 'a' , 1),('type1','s2' , 'b' , 1 ),('type1' , 's1' , 'a' , 1),('type1','s2' , 'b' , 1 ), ('type1' , 's2' , 'c' , 1), ('type1' , 's3' , 'd' , 1),('type1' , 's1' , 'e' , 1),('type1','s2' , 'f' , 1 ), ('type1' , 's3' , 'g' , 1)]
df=spark.createDataFrame(l,['type' , 'stage' , 'Timestamp', 'CycleID'])
df.show()
解决方案
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import *
df=(
df.withColumn('CycleID',col('stage')=='s1')#Generate Booleans through Selection
.withColumn('CycleID', F.sum(F.col('CycleID').cast('integer'))#Convert Boolean to intergers
.over(Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)))#rowsBetween(-sys.maxsize, 0) along with sum function is used to create cumulative sum of the column
)
df.show()
+-----+-----+---------+-------+
| type|stage|Timestamp|CycleID|
+-----+-----+---------+-------+
|type1| s1| a| 1|
|type1| s2| b| 1|
|type1| s2| c| 1|
|type1| s3| d| 1|
|type1| s1| e| 2|
|type1| s2| f| 2|
|type1| s3| g| 2|
+-----+-----+---------+-------+
根据您的以下评论:
请对 s3
进行降序和布尔 select 排序。下面的代码
df.sort(col('Timestamp').desc()).withColumn('CycleID',(col('stage')=='s3')).withColumn('CycleID', F.sum(F.col('CycleID').cast('integer')).over(Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0))).show()
+-----+-----+---------+-------+
| type|stage|Timestamp|CycleID|
+-----+-----+---------+-------+
|type1| s3| g| 1|
|type1| s2| f| 1|
|type1| s1| e| 1|
|type1| s3| d| 2|
|type1| s2| c| 2|
|type1| s2| b| 2|
|type1| s2| b| 2|
|type1| s1| a| 2|
|type1| s1| a| 2|
+-----+-----+---------+-------+
如果你有多个s3.使用滞后如下;
m=Window.partitionBy()#.orderBy(F.desc('Timestamp'))
df1=df.select("*", lag("stage").over(m.orderBy(col("Timestamp"))).alias("CycleID1"))
df1.withColumn('CycleID',(((col('stage')=='s1')&(col('CycleID1').isNull()))|((col('stage')=='s1')&(col('CycleID1')=='s3')))).withColumn('CycleID', F.sum(F.col('CycleID').cast('integer')).over(m.rowsBetween(-sys.maxsize, 0))).drop('CycleID1').show()
+-----+-----+---------+-------+
| type|stage|Timestamp|CycleID|
+-----+-----+---------+-------+
|type1| s1| a| 1|
|type1| s1| a| 1|
|type1| s2| b| 1|
|type1| s2| b| 1|
|type1| s2| c| 1|
|type1| s3| d| 1|
|type1| s1| e| 2|
|type1| s2| f| 2|
|type1| s3| g| 2|
+-----+-----+---------+-------+
我有一个数据框,我需要在其中生成 'CycleID' 列,如下所示:
+-------+-------+----------+---------+
| type | stage | Timestamp| CycleID |
+-------+-------+----------+---------+
| type1 | s1 | a | 1 |
| type1 | s2 | b | 1 |
| type1 | s2 | c | 1 |
| type1 | s3 | d | 1 |
| type1 | s1 | e | 2 |
| type1 | s2 | f | 2 |
| type1 | s3 | g | 2 |
| type2 | s1 | a | 1 |
| type2 | s2 | b | 1 |
| type2 | s3 | c | 1 |
+-------+-------+----------+---------+
数据约束
- 一个类型的每个周期都发生了 3 个预定阶段
按顺序。
- 循环中的各个阶段可以重复,但是
他们不能乱序发生。
例如,阶段
s1
永远不会发生在阶段s2
. 之后
- 时间戳保证在每个阶段的行之间递增。例如:
b > a
.
目标是有一个新列“CycleID
”,用于唯一标识每种类型的循环。
到目前为止我尝试过的:
w = Window.partitionBy("type").orderBy("Timestamp")
inputdf = inputdf.withColumn("stagenum", func.expr("substring(stage, 2)")).withColumn("stagenum", col("stagenum").cast(IntegerType()))
inputdf = inputdf.withColumn("temp", func.when((col("stagenum") - func.lag("stagenum", 1).over(w)).isNull() | \
(col("stagenum") - func.lag("stagenum", 1).over(w) == func.lit(0)) |\
(col("stagenum") - func.lag("stagenum", 1).over(w) == func.lit(1)), func.lit(1)).otherwise(func.lit(100)))
除此之外,我尝试了使用 lag() 的不同方法,但似乎没有一种干净的方法来分配 CycleId。
寻求帮助。
数据
l=[('type1' , 's1' , 'a' , 1),('type1','s2' , 'b' , 1 ),('type1' , 's1' , 'a' , 1),('type1','s2' , 'b' , 1 ), ('type1' , 's2' , 'c' , 1), ('type1' , 's3' , 'd' , 1),('type1' , 's1' , 'e' , 1),('type1','s2' , 'f' , 1 ), ('type1' , 's3' , 'g' , 1)]
df=spark.createDataFrame(l,['type' , 'stage' , 'Timestamp', 'CycleID'])
df.show()
解决方案
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import *
df=(
df.withColumn('CycleID',col('stage')=='s1')#Generate Booleans through Selection
.withColumn('CycleID', F.sum(F.col('CycleID').cast('integer'))#Convert Boolean to intergers
.over(Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)))#rowsBetween(-sys.maxsize, 0) along with sum function is used to create cumulative sum of the column
)
df.show()
+-----+-----+---------+-------+
| type|stage|Timestamp|CycleID|
+-----+-----+---------+-------+
|type1| s1| a| 1|
|type1| s2| b| 1|
|type1| s2| c| 1|
|type1| s3| d| 1|
|type1| s1| e| 2|
|type1| s2| f| 2|
|type1| s3| g| 2|
+-----+-----+---------+-------+
根据您的以下评论:
请对 s3
进行降序和布尔 select 排序。下面的代码
df.sort(col('Timestamp').desc()).withColumn('CycleID',(col('stage')=='s3')).withColumn('CycleID', F.sum(F.col('CycleID').cast('integer')).over(Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0))).show()
+-----+-----+---------+-------+
| type|stage|Timestamp|CycleID|
+-----+-----+---------+-------+
|type1| s3| g| 1|
|type1| s2| f| 1|
|type1| s1| e| 1|
|type1| s3| d| 2|
|type1| s2| c| 2|
|type1| s2| b| 2|
|type1| s2| b| 2|
|type1| s1| a| 2|
|type1| s1| a| 2|
+-----+-----+---------+-------+
如果你有多个s3.使用滞后如下;
m=Window.partitionBy()#.orderBy(F.desc('Timestamp'))
df1=df.select("*", lag("stage").over(m.orderBy(col("Timestamp"))).alias("CycleID1"))
df1.withColumn('CycleID',(((col('stage')=='s1')&(col('CycleID1').isNull()))|((col('stage')=='s1')&(col('CycleID1')=='s3')))).withColumn('CycleID', F.sum(F.col('CycleID').cast('integer')).over(m.rowsBetween(-sys.maxsize, 0))).drop('CycleID1').show()
+-----+-----+---------+-------+
| type|stage|Timestamp|CycleID|
+-----+-----+---------+-------+
|type1| s1| a| 1|
|type1| s1| a| 1|
|type1| s2| b| 1|
|type1| s2| b| 1|
|type1| s2| c| 1|
|type1| s3| d| 1|
|type1| s1| e| 2|
|type1| s2| f| 2|
|type1| s3| g| 2|
+-----+-----+---------+-------+