将 Spark 中的行与现有列一起展平

Flattening rows in Spark along with existing columns

我有如下数据集。

id1   k1, k2, k3, k4
id2   k1, k2
id3   k2, k3
id4   k4

我想计算每个 "k" 所在的行数以及它所在的 ID。

输出:

k1  2    id1, id2
k2  3    id1, id2, id3
k3  2    id1, id3
k4  2    id1, id4

我使用了 explode,然后按键分组,我得到了以下输出。

val newlines = sparkSession.read.textFile(s3Path)
.map(ke => {
            val split = ke.split("\t")
            (split(0), split(1).toString.split(", "))
    })

val myDF = newlines.withColumn("Key", explode($"_3")).groupBy(("Key"))
    .agg(count("Key"))

k1  2    
k2  3   
k3  2 
k4  2

有没有办法我也可以添加 ID?

您可以使用 spark inbuilt 函数 split,explode,agg!

示例:

scala> import org.apache.spark.sql.functions._
scala> val df=Seq(("id1","k1,k2,k3,k4"),
                  ("id2","k1,k2"),
                  ("id3","k2,k3"),
                  ("id4","k4"))
              .toDF("a","b")
scala> df.selectExpr("a","explode(split(b,',')) as ex")
         .groupBy('ex) 
         .agg(concat_ws(",",collect_list('a)).alias("b"),
            count("*").alias("cnt"))
         .orderBy('ex)
         .show()

结果:

+---+-----------+---+
| ex|          b|cnt|
+---+-----------+---+
| k1|    id1,id2|  2|
| k2|id1,id2,id3|  3|
| k3|    id1,id3|  2|
| k4|    id1,id4|  2|
+---+-----------+---+