如何将 RDD[(String, Iterable[VertexId])] 转换为 DataFrame?
How to convert RDD[(String, Iterable[VertexId])] to DataFrame?
我从 Graphx
创建了一个 RDD
,看起来像这样:
val graph = GraphLoader.edgeListFile(spark.sparkContext, fileName)
var s: VertexRDD[VertexId] = graph.connectedComponents().vertices
val nodeGraph: RDD[(String, Iterable[VertexId])] = s.groupBy(_._2) map { case (x, y) =>
val rand = randomUUID().toString
val clusterList: Iterable[VertexId] = y.map(_._1)
(rand, clusterList)
}
nodeGraph
的类型是RDD[(String, Iterable[VertexId])]
,里面的数据会是这样的形式:
(abc-def11, Iterable(1,2,3,4)),
(def-aaa, Iterable(10,11)),
...
我现在想做的是从中创建一个数据框,应该如下所示:
col1 col2
abc-def11 1
abc-def11 2
abc-def11 3
abc-def11 4
def-aaa 10
def-aaa 11
如何在 Spark 中执行此操作?
首先,使用 toDF()
将 RDD 转换为数据帧,其中包含您想要的列名。最简单的方法是先将 Iterable[VertexId]
更改为 Seq[Long]
。
import spark.implicits._
val df = nodeGraph.map(x => (x._1, x._2.map(_.toLong).toSeq)).toDF("col1", "col2")
请注意,这可以在创建时完成 nodeGraph
以节省步骤。接下来,使用 explode
函数来展平数据帧,
val df2 = df.withColumn("col2", explode($"col2"))
这将为您提供所需的输出。
我从 Graphx
创建了一个 RDD
,看起来像这样:
val graph = GraphLoader.edgeListFile(spark.sparkContext, fileName)
var s: VertexRDD[VertexId] = graph.connectedComponents().vertices
val nodeGraph: RDD[(String, Iterable[VertexId])] = s.groupBy(_._2) map { case (x, y) =>
val rand = randomUUID().toString
val clusterList: Iterable[VertexId] = y.map(_._1)
(rand, clusterList)
}
nodeGraph
的类型是RDD[(String, Iterable[VertexId])]
,里面的数据会是这样的形式:
(abc-def11, Iterable(1,2,3,4)),
(def-aaa, Iterable(10,11)),
...
我现在想做的是从中创建一个数据框,应该如下所示:
col1 col2
abc-def11 1
abc-def11 2
abc-def11 3
abc-def11 4
def-aaa 10
def-aaa 11
如何在 Spark 中执行此操作?
首先,使用 toDF()
将 RDD 转换为数据帧,其中包含您想要的列名。最简单的方法是先将 Iterable[VertexId]
更改为 Seq[Long]
。
import spark.implicits._
val df = nodeGraph.map(x => (x._1, x._2.map(_.toLong).toSeq)).toDF("col1", "col2")
请注意,这可以在创建时完成 nodeGraph
以节省步骤。接下来,使用 explode
函数来展平数据帧,
val df2 = df.withColumn("col2", explode($"col2"))
这将为您提供所需的输出。