如何将 RDD[(String, Iterable[VertexId])] 转换为 DataFrame?

How to convert RDD[(String, Iterable[VertexId])] to DataFrame?

我从 Graphx 创建了一个 RDD,看起来像这样:

val graph = GraphLoader.edgeListFile(spark.sparkContext, fileName)
var s: VertexRDD[VertexId] = graph.connectedComponents().vertices

val nodeGraph: RDD[(String, Iterable[VertexId])] = s.groupBy(_._2) map { case (x, y) =>
  val rand = randomUUID().toString
  val clusterList: Iterable[VertexId] = y.map(_._1)
  (rand, clusterList)
}

nodeGraph的类型是RDD[(String, Iterable[VertexId])],里面的数据会是这样的形式:

(abc-def11, Iterable(1,2,3,4)), 
(def-aaa, Iterable(10,11)), 
...

我现在想做的是从中创建一个数据框,应该如下所示:

col1        col2
abc-def11   1
abc-def11   2
abc-def11   3
abc-def11   4
def-aaa     10
def-aaa     11

如何在 Spark 中执行此操作?

首先,使用 toDF() 将 RDD 转换为数据帧,其中包含您想要的列名。最简单的方法是先将 Iterable[VertexId] 更改为 Seq[Long]

import spark.implicits._
val df = nodeGraph.map(x => (x._1, x._2.map(_.toLong).toSeq)).toDF("col1", "col2")

请注意,这可以在创建时完成 nodeGraph 以节省步骤。接下来,使用 explode 函数来展平数据帧,

val df2 = df.withColumn("col2", explode($"col2"))

这将为您提供所需的输出。