PySpark:如何获得列的所有组合
PySpark: how to get all combinations of columns
我有一个包含批次、输入和输出组合的 DF,我希望能够将它们的“唯一组合”添加回 DataFrame。数据的简单表示如下所示:
Batch
Output
Input
1
A
X
1
A
Y
1
A
Z
2
A
X
2
A
Y
2
A
Z
3
A
V
3
A
Y
3
A
Z
4
A
W
4
A
Y
4
A
Z
正如您所见,有 4 个批次和 3 个不同的输入组合来生成相同的输出类型,我希望最终得到的结果是:
Batch
Output
Input
Combination
1
A
X
1
1
A
Y
1
1
A
Z
1
2
A
X
1
2
A
Y
1
2
A
Z
1
3
A
V
2
3
A
Y
2
3
A
Z
2
4
A
W
3
4
A
Y
3
4
A
Z
3
我希望在 PySpark 中实现它以进行进一步的数据操作,任何指导将不胜感激:)
编辑:仍然不够优雅,但它在 PySpark 中有效!我相信一定有一种更简单的方法可以使用集合或字典来做到这一点,我的大脑只是拒绝让我看到它...
df = spark.createDataFrame(
[
(1,'A','X'),
(1,'A','Y'),
(1,'A','Z'),
(2,'A','X'),
(2,'A','Y'),
(2,'A','Z'),
(3,'A','V'),
(3,'A','Y'),
(3,'A','Z'),
(4,'A','W'),
(4,'A','Y'),
(4,'A','Z'),
(5,'B','X'),
(5,'B','Y'),
(5,'B','Z')
],
["Batch", "Output", "Input"]
)
grouped = df.orderBy("Input").groupBy(["Batch", "Output"]).agg(f.concat_ws('_', f.sort_array(f.collect_list("Input"))).alias("Comb"))
grouped = grouped.withColumn("TotalComb", f.concat_ws('_',grouped.Output, grouped.Comb))
w = Window.partitionBy().orderBy(f.col('TotalComb').asc())
groupunique = grouped[["totalComb"]].distinct().withColumn("UniqueComb", f.row_number().over(w))
connected = df.join(grouped, on = ["Batch", "Output"], how = "left").join(groupunique, on = ["totalComb"], how = "left")
创建一个输入列表,按该列表分类,找到连续差异并使用它们创建值以对整个 df 进行累积求和
w=Window.partitionBy("Batch","Output").orderBy("Batch")
df1=(df.withColumn('Combination',collect_set("Input").over(w))
.withColumn('Combination',sum(when(lag('Output').over(Window.partitionBy("Combination",'Output').orderBy("Batch")).isNull(),1)
.otherwise(0)).over(Window.partitionBy().orderBy('Batch')))).show()
+-----+------+-----+-----------+
|Batch|Output|Input|Combination|
+-----+------+-----+-----------+
| 1| A| X| 1|
| 1| A| Y| 1|
| 1| A| Z| 1|
| 2| A| X| 1|
| 2| A| Y| 1|
| 2| A| Z| 1|
| 3| A| V| 2|
| 3| A| Y| 2|
| 3| A| Z| 2|
| 4| A| W| 3|
| 4| A| Y| 3|
| 4| A| Z| 3|
| 5| B| X| 4|
| 5| B| Y| 4|
| 5| B| Z| 4|
+-----+------+-----+-----------+
我有一个包含批次、输入和输出组合的 DF,我希望能够将它们的“唯一组合”添加回 DataFrame。数据的简单表示如下所示:
Batch | Output | Input |
---|---|---|
1 | A | X |
1 | A | Y |
1 | A | Z |
2 | A | X |
2 | A | Y |
2 | A | Z |
3 | A | V |
3 | A | Y |
3 | A | Z |
4 | A | W |
4 | A | Y |
4 | A | Z |
正如您所见,有 4 个批次和 3 个不同的输入组合来生成相同的输出类型,我希望最终得到的结果是:
Batch | Output | Input | Combination |
---|---|---|---|
1 | A | X | 1 |
1 | A | Y | 1 |
1 | A | Z | 1 |
2 | A | X | 1 |
2 | A | Y | 1 |
2 | A | Z | 1 |
3 | A | V | 2 |
3 | A | Y | 2 |
3 | A | Z | 2 |
4 | A | W | 3 |
4 | A | Y | 3 |
4 | A | Z | 3 |
我希望在 PySpark 中实现它以进行进一步的数据操作,任何指导将不胜感激:)
编辑:仍然不够优雅,但它在 PySpark 中有效!我相信一定有一种更简单的方法可以使用集合或字典来做到这一点,我的大脑只是拒绝让我看到它...
df = spark.createDataFrame(
[
(1,'A','X'),
(1,'A','Y'),
(1,'A','Z'),
(2,'A','X'),
(2,'A','Y'),
(2,'A','Z'),
(3,'A','V'),
(3,'A','Y'),
(3,'A','Z'),
(4,'A','W'),
(4,'A','Y'),
(4,'A','Z'),
(5,'B','X'),
(5,'B','Y'),
(5,'B','Z')
],
["Batch", "Output", "Input"]
)
grouped = df.orderBy("Input").groupBy(["Batch", "Output"]).agg(f.concat_ws('_', f.sort_array(f.collect_list("Input"))).alias("Comb"))
grouped = grouped.withColumn("TotalComb", f.concat_ws('_',grouped.Output, grouped.Comb))
w = Window.partitionBy().orderBy(f.col('TotalComb').asc())
groupunique = grouped[["totalComb"]].distinct().withColumn("UniqueComb", f.row_number().over(w))
connected = df.join(grouped, on = ["Batch", "Output"], how = "left").join(groupunique, on = ["totalComb"], how = "left")
创建一个输入列表,按该列表分类,找到连续差异并使用它们创建值以对整个 df 进行累积求和
w=Window.partitionBy("Batch","Output").orderBy("Batch")
df1=(df.withColumn('Combination',collect_set("Input").over(w))
.withColumn('Combination',sum(when(lag('Output').over(Window.partitionBy("Combination",'Output').orderBy("Batch")).isNull(),1)
.otherwise(0)).over(Window.partitionBy().orderBy('Batch')))).show()
+-----+------+-----+-----------+
|Batch|Output|Input|Combination|
+-----+------+-----+-----------+
| 1| A| X| 1|
| 1| A| Y| 1|
| 1| A| Z| 1|
| 2| A| X| 1|
| 2| A| Y| 1|
| 2| A| Z| 1|
| 3| A| V| 2|
| 3| A| Y| 2|
| 3| A| Z| 2|
| 4| A| W| 3|
| 4| A| Y| 3|
| 4| A| Z| 3|
| 5| B| X| 4|
| 5| B| Y| 4|
| 5| B| Z| 4|
+-----+------+-----+-----------+