如何在 Spark 中使用两列和两个方向进行分组

How to groupBy in Spark using two columns and in both directions

我想基于两个方向的两列对我的数据框元素进行分组。 这是使用过的数据帧的示例

val columns = Seq("src","dst")
val data = Seq(("A", "B"), ("B", "C"), ("C", "A"),("A", "B"), ("B", "A"), ("B", "A"),("A", "C"), ("B", "A"), ("C", "D"),("D", "C"), ("A", "C"), ("C", "A"))
val rdd = spark.sparkContext.parallelize(data)
val dff = spark.createDataFrame(rdd).toDF(columns:_*)

当我在两列上使用简单的 groupBy 时,我得到了这个结果

dff.groupBy("src","dst").count().show()
+---+---+-----+
|src|dst|count|
+---+---+-----+
|  B|  C|    1|
|  D|  C|    1|
|  A|  C|    2|
|  C|  A|    2|
|  C|  D|    1|
|  B|  A|    3|
|  A|  B|    2|
+---+---+-----+

我想要分组列,其中 srcdst 在另一个方向相同(例如分组 A,C C,A一起,A,BB,A 在一起...)。 想要的结果就是这样

+---+---+-----+
|src|dst|count|
+---+---+-----+
|  B|  C|    1|
|  D|  C|    2|
|  A|  C|    4|
|  B|  A|    5|
+---+---+-----+

有什么解决办法吗?

您可以创建一个新的数组列 array 包含您的两个按列分组,对该数组进行排序并按该数组列分组,如下所示:

import org.apache.spark.sql.functions.{array, array_sort, col, count, first}

val result = dff.withColumn("group", array_sort(array(col("src"), col("dst"))))
  .groupBy("group")
  .agg(first("src").as("src"), first("dst").as("dst"), count("group").as("count"))
  .drop("group")

result数据框如下:

+---+---+-----+
|src|dst|count|
+---+---+-----+
|A  |B  |5    |
|C  |A  |4    |
|B  |C  |1    |
|C  |D  |2    |
+---+---+-----+

如果您没有 array_sort 方法(在 spark 2.4 中可用),您可以使用 when 条件对 srcdst 两列重新排序,如下:

import org.apache.spark.sql.functions.{col, when}

val result = dff
  .withColumn("first", when(col("dst") < col("src"), col("src")).otherwise(col("dst")))
  .withColumn("second", when(col("dst") >= col("src"), col("src")).otherwise(col("dst")))
  .drop("src", "dst")
  .withColumnRenamed("first", "src")
  .withColumnRenamed("second", "dst")
  .groupBy("src", "dst")
  .count()

但是,第二种方法只适用于两列

另一种不使用数组的方法,您可以按 greatestleast 函数分组,如下所示:

dff.groupBy(
    least($"src", $"dst").as("src"),
    greatest($"src", $"dst").as("dst"),
).count().show

//+---+---+-----+
//|src|dst|count|
//+---+---+-----+
//|  B|  C|    1|
//|  A|  C|    4|
//|  C|  D|    2|
//|  A|  B|    5|
//+---+---+-----+

如果您希望最大值位于 src 列中,则可以更改 srcdst 的顺序。