如何在 Spark 中使用两列和两个方向进行分组
How to groupBy in Spark using two columns and in both directions
我想基于两个方向的两列对我的数据框元素进行分组。
这是使用过的数据帧的示例
val columns = Seq("src","dst")
val data = Seq(("A", "B"), ("B", "C"), ("C", "A"),("A", "B"), ("B", "A"), ("B", "A"),("A", "C"), ("B", "A"), ("C", "D"),("D", "C"), ("A", "C"), ("C", "A"))
val rdd = spark.sparkContext.parallelize(data)
val dff = spark.createDataFrame(rdd).toDF(columns:_*)
当我在两列上使用简单的 groupBy 时,我得到了这个结果
dff.groupBy("src","dst").count().show()
+---+---+-----+
|src|dst|count|
+---+---+-----+
| B| C| 1|
| D| C| 1|
| A| C| 2|
| C| A| 2|
| C| D| 1|
| B| A| 3|
| A| B| 2|
+---+---+-----+
我想要分组列,其中 src
和 dst
在另一个方向相同(例如分组 A,C 和C,A一起,A,B 和 B,A 在一起...)。
想要的结果就是这样
+---+---+-----+
|src|dst|count|
+---+---+-----+
| B| C| 1|
| D| C| 2|
| A| C| 4|
| B| A| 5|
+---+---+-----+
有什么解决办法吗?
您可以创建一个新的数组列 array
包含您的两个按列分组,对该数组进行排序并按该数组列分组,如下所示:
import org.apache.spark.sql.functions.{array, array_sort, col, count, first}
val result = dff.withColumn("group", array_sort(array(col("src"), col("dst"))))
.groupBy("group")
.agg(first("src").as("src"), first("dst").as("dst"), count("group").as("count"))
.drop("group")
result
数据框如下:
+---+---+-----+
|src|dst|count|
+---+---+-----+
|A |B |5 |
|C |A |4 |
|B |C |1 |
|C |D |2 |
+---+---+-----+
如果您没有 array_sort
方法(在 spark 2.4 中可用),您可以使用 when
条件对 src
和 dst
两列重新排序,如下:
import org.apache.spark.sql.functions.{col, when}
val result = dff
.withColumn("first", when(col("dst") < col("src"), col("src")).otherwise(col("dst")))
.withColumn("second", when(col("dst") >= col("src"), col("src")).otherwise(col("dst")))
.drop("src", "dst")
.withColumnRenamed("first", "src")
.withColumnRenamed("second", "dst")
.groupBy("src", "dst")
.count()
但是,第二种方法只适用于两列
另一种不使用数组的方法,您可以按 greatest
和 least
函数分组,如下所示:
dff.groupBy(
least($"src", $"dst").as("src"),
greatest($"src", $"dst").as("dst"),
).count().show
//+---+---+-----+
//|src|dst|count|
//+---+---+-----+
//| B| C| 1|
//| A| C| 4|
//| C| D| 2|
//| A| B| 5|
//+---+---+-----+
如果您希望最大值位于 src
列中,则可以更改 src
和 dst
的顺序。
我想基于两个方向的两列对我的数据框元素进行分组。 这是使用过的数据帧的示例
val columns = Seq("src","dst")
val data = Seq(("A", "B"), ("B", "C"), ("C", "A"),("A", "B"), ("B", "A"), ("B", "A"),("A", "C"), ("B", "A"), ("C", "D"),("D", "C"), ("A", "C"), ("C", "A"))
val rdd = spark.sparkContext.parallelize(data)
val dff = spark.createDataFrame(rdd).toDF(columns:_*)
当我在两列上使用简单的 groupBy 时,我得到了这个结果
dff.groupBy("src","dst").count().show()
+---+---+-----+
|src|dst|count|
+---+---+-----+
| B| C| 1|
| D| C| 1|
| A| C| 2|
| C| A| 2|
| C| D| 1|
| B| A| 3|
| A| B| 2|
+---+---+-----+
我想要分组列,其中 src
和 dst
在另一个方向相同(例如分组 A,C 和C,A一起,A,B 和 B,A 在一起...)。
想要的结果就是这样
+---+---+-----+
|src|dst|count|
+---+---+-----+
| B| C| 1|
| D| C| 2|
| A| C| 4|
| B| A| 5|
+---+---+-----+
有什么解决办法吗?
您可以创建一个新的数组列 array
包含您的两个按列分组,对该数组进行排序并按该数组列分组,如下所示:
import org.apache.spark.sql.functions.{array, array_sort, col, count, first}
val result = dff.withColumn("group", array_sort(array(col("src"), col("dst"))))
.groupBy("group")
.agg(first("src").as("src"), first("dst").as("dst"), count("group").as("count"))
.drop("group")
result
数据框如下:
+---+---+-----+
|src|dst|count|
+---+---+-----+
|A |B |5 |
|C |A |4 |
|B |C |1 |
|C |D |2 |
+---+---+-----+
如果您没有 array_sort
方法(在 spark 2.4 中可用),您可以使用 when
条件对 src
和 dst
两列重新排序,如下:
import org.apache.spark.sql.functions.{col, when}
val result = dff
.withColumn("first", when(col("dst") < col("src"), col("src")).otherwise(col("dst")))
.withColumn("second", when(col("dst") >= col("src"), col("src")).otherwise(col("dst")))
.drop("src", "dst")
.withColumnRenamed("first", "src")
.withColumnRenamed("second", "dst")
.groupBy("src", "dst")
.count()
但是,第二种方法只适用于两列
另一种不使用数组的方法,您可以按 greatest
和 least
函数分组,如下所示:
dff.groupBy(
least($"src", $"dst").as("src"),
greatest($"src", $"dst").as("dst"),
).count().show
//+---+---+-----+
//|src|dst|count|
//+---+---+-----+
//| B| C| 1|
//| A| C| 4|
//| C| D| 2|
//| A| B| 5|
//+---+---+-----+
如果您希望最大值位于 src
列中,则可以更改 src
和 dst
的顺序。