使用 PySpark 根据行值模式对记录进行分组
Grouping records based on a pattern of row values using PySpark
我有一个包含 3 列的 table:
Table A:
+----+----+----------+
|col1|col2|row_number|
+----+----+----------+
| X| 1| 1|
| Y| 0| 2|
| Z| 2| 3|
| A| 1| 4|
| B| 0| 5|
| C| 0| 6|
| D| 2| 7|
| P| 1| 8|
| Q| 2| 9|
+----+----+----------+
我想通过根据 "col2" 值对记录进行分组来连接 "col1" 中的字符串。
"col2" 的模式为 1 后跟任意数量的 0,然后是 2。我想将 "col2" 以 1 开头并以 2 结尾的记录分组(必须保持数据框的顺序- 您可以为订单使用 row_number 列)
例如,前 3 条记录可以组合在一起,因为 "col2" 有“1-0-2”。接下来的 4 条记录可以组合在一起,因为它们的 "col2" 值具有“1-0-0-2”
在我对这些记录进行分组后,可以使用 "concat_ws" 完成连接部分。但是对如何根据“1-0s-2”模式对这些记录进行分组有什么帮助吗?
预期输出:
+----------+
|output_col|
+----------+
| XYZ|
| ABCD|
| PQ|
+----------+
您可以使用以下代码创建此示例数据:
schema = StructType([StructField("col1", StringType())\
,StructField("col2", IntegerType())\
,StructField("row_number", IntegerType())])
data = [['X', 1, 1], ['Y', 0, 2], ['Z', 2, 3], ['A', 1, 4], ['B', 0, 5], ['C', 0, 6], ['D', 2, 7], ['P', 1, 8], ['Q', 2, 9]]
df = spark.createDataFrame(data,schema=schema)
df.show()
我建议您使用 window
函数。首先使用 row_number
排序的 window 得到 col2
的 增量总和 。 incremental sum
将具有 3 的倍数 ,这基本上是您需要的 组的 端点 。将它们替换为相同 window、 的 滞后,以获得 incremental_sum
中的 所需分区 。现在您可以 groupBy incremental_sum
列和 collect_list
。您可以在收集的列表中array_join
(spark2.4),获取您想要的字符串。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().orderBy("row_number")
df.withColumn("incremental_sum", F.sum("col2").over(w))\
.withColumn("lag", F.lag("incremental_sum").over(w))\
.withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
.groupBy("incremental_sum").agg(F.array_join(F.collect_list("col1"),"").alias("output_col")).drop("incremental_sum").show()
+----------+
|output_col|
+----------+
| XYZ|
| ABCD|
| PQ|
+----------+
我有一个包含 3 列的 table:
Table A:
+----+----+----------+
|col1|col2|row_number|
+----+----+----------+
| X| 1| 1|
| Y| 0| 2|
| Z| 2| 3|
| A| 1| 4|
| B| 0| 5|
| C| 0| 6|
| D| 2| 7|
| P| 1| 8|
| Q| 2| 9|
+----+----+----------+
我想通过根据 "col2" 值对记录进行分组来连接 "col1" 中的字符串。 "col2" 的模式为 1 后跟任意数量的 0,然后是 2。我想将 "col2" 以 1 开头并以 2 结尾的记录分组(必须保持数据框的顺序- 您可以为订单使用 row_number 列)
例如,前 3 条记录可以组合在一起,因为 "col2" 有“1-0-2”。接下来的 4 条记录可以组合在一起,因为它们的 "col2" 值具有“1-0-0-2”
在我对这些记录进行分组后,可以使用 "concat_ws" 完成连接部分。但是对如何根据“1-0s-2”模式对这些记录进行分组有什么帮助吗?
预期输出:
+----------+
|output_col|
+----------+
| XYZ|
| ABCD|
| PQ|
+----------+
您可以使用以下代码创建此示例数据:
schema = StructType([StructField("col1", StringType())\
,StructField("col2", IntegerType())\
,StructField("row_number", IntegerType())])
data = [['X', 1, 1], ['Y', 0, 2], ['Z', 2, 3], ['A', 1, 4], ['B', 0, 5], ['C', 0, 6], ['D', 2, 7], ['P', 1, 8], ['Q', 2, 9]]
df = spark.createDataFrame(data,schema=schema)
df.show()
我建议您使用 window
函数。首先使用 row_number
排序的 window 得到 col2
的 增量总和 。 incremental sum
将具有 3 的倍数 ,这基本上是您需要的 组的 端点 。将它们替换为相同 window、 的 滞后,以获得 incremental_sum
中的 所需分区 。现在您可以 groupBy incremental_sum
列和 collect_list
。您可以在收集的列表中array_join
(spark2.4),获取您想要的字符串。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().orderBy("row_number")
df.withColumn("incremental_sum", F.sum("col2").over(w))\
.withColumn("lag", F.lag("incremental_sum").over(w))\
.withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
.groupBy("incremental_sum").agg(F.array_join(F.collect_list("col1"),"").alias("output_col")).drop("incremental_sum").show()
+----------+
|output_col|
+----------+
| XYZ|
| ABCD|
| PQ|
+----------+