按特定顺序合并多个RDD
Merge multiple RDD in a specific order
我正在尝试按特定顺序将多个字符串 RDD 合并到一个行 RDD。我试图创建一个 Map[String, RDD[Seq[String]]]
(其中 Seq
只包含一个元素)然后将它们合并到 RDD[Row[String]]
,但它似乎不起作用([= 的内容15=] 丢失了).. 有人有什么想法吗?
val t1: StructType
val mapFields: Map[String, RDD[Seq[String]]]
var ordRDD: RDD[Seq[String]] = context.emptyRDD
t1.foreach(field => ordRDD = ordRDD ++ mapFiels(field.name))
val rdd = ordRDD.map(line => Row.fromSeq(line))
编辑:
使用 zip 函数会导致 spark 异常,因为我的 RDD 在每个分区中没有相同数量的元素。我不知道如何确保它们在每个分区中都具有相同数量的元素,所以我只是用索引压缩它们,然后使用 ListMap
以良好的顺序加入它们。也许 mapPartitions
函数有技巧,但我对 Spark API 的了解还不够。
val mapFields: Map[String, RDD[String]]
var ord: ListMap[String, RDD[String]] = ListMap()
t1.foreach(field => ord = ord ++ Map(field.name -> mapFields(field.name)))
// Note : zip = SparkException: Can only zip RDDs with same number of elements in each partition
//val rdd: RDD[Row] = ord.toSeq.map(_._2.map(s => Seq(s))).reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map{ case (l1, l2) => l1 ++ l2 }).map(Row.fromSeq)
val zipRdd = ord.toSeq.map(_._2.map(s => Seq(s)).zipWithIndex().map{ case (d, i) => (i, d) })
val concatRdd = zipRdd.reduceLeft((rdd1, rdd2) => rdd1.join(rdd2).map{ case (i, (l1, l2)) => (i, l1 ++ l2)})
val rowRdd: RDD[Row] = concatRdd.map{ case (i, d) => Row.fromSeq(d) }
val df1 = spark.createDataFrame(rowRdd, t1)
这里的关键是使用RDD.zip
到"zip" RDDs在一起(创建一个RDD,其中每条记录都是ell RDDs中具有相同索引的记录的组合):
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// INPUT: Map does not preserve order (not the defaul implementation, at least) - using Seq
val rdds: Seq[(String, RDD[String])] = Seq(
"field1" -> sc.parallelize(Seq("a", "b", "c")),
"field2" -> sc.parallelize(Seq("1", "2", "3")),
"field3" -> sc.parallelize(Seq("Q", "W", "E"))
)
// Use RDD.zip to zip all RDDs together, then convert to Rows
val rowRdd: RDD[Row] = rdds
.map(_._2)
.map(_.map(s => Seq(s)))
.reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map { case (l1, l2) => l1 ++ l2 })
.map(Row.fromSeq)
// Create schema using the column names:
val schema: StructType = StructType(rdds.map(_._1).map(name => StructField(name, StringType)))
// Create DataFrame:
val result: DataFrame = spark.createDataFrame(rowRdd, schema)
result.show
// +------+------+------+
// |field1|field2|field3|
// +------+------+------+
// | a| 1| Q|
// | b| 2| W|
// | c| 3| E|
// +------+------+------+
我正在尝试按特定顺序将多个字符串 RDD 合并到一个行 RDD。我试图创建一个 Map[String, RDD[Seq[String]]]
(其中 Seq
只包含一个元素)然后将它们合并到 RDD[Row[String]]
,但它似乎不起作用([= 的内容15=] 丢失了).. 有人有什么想法吗?
val t1: StructType
val mapFields: Map[String, RDD[Seq[String]]]
var ordRDD: RDD[Seq[String]] = context.emptyRDD
t1.foreach(field => ordRDD = ordRDD ++ mapFiels(field.name))
val rdd = ordRDD.map(line => Row.fromSeq(line))
编辑:
使用 zip 函数会导致 spark 异常,因为我的 RDD 在每个分区中没有相同数量的元素。我不知道如何确保它们在每个分区中都具有相同数量的元素,所以我只是用索引压缩它们,然后使用 ListMap
以良好的顺序加入它们。也许 mapPartitions
函数有技巧,但我对 Spark API 的了解还不够。
val mapFields: Map[String, RDD[String]]
var ord: ListMap[String, RDD[String]] = ListMap()
t1.foreach(field => ord = ord ++ Map(field.name -> mapFields(field.name)))
// Note : zip = SparkException: Can only zip RDDs with same number of elements in each partition
//val rdd: RDD[Row] = ord.toSeq.map(_._2.map(s => Seq(s))).reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map{ case (l1, l2) => l1 ++ l2 }).map(Row.fromSeq)
val zipRdd = ord.toSeq.map(_._2.map(s => Seq(s)).zipWithIndex().map{ case (d, i) => (i, d) })
val concatRdd = zipRdd.reduceLeft((rdd1, rdd2) => rdd1.join(rdd2).map{ case (i, (l1, l2)) => (i, l1 ++ l2)})
val rowRdd: RDD[Row] = concatRdd.map{ case (i, d) => Row.fromSeq(d) }
val df1 = spark.createDataFrame(rowRdd, t1)
这里的关键是使用RDD.zip
到"zip" RDDs在一起(创建一个RDD,其中每条记录都是ell RDDs中具有相同索引的记录的组合):
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// INPUT: Map does not preserve order (not the defaul implementation, at least) - using Seq
val rdds: Seq[(String, RDD[String])] = Seq(
"field1" -> sc.parallelize(Seq("a", "b", "c")),
"field2" -> sc.parallelize(Seq("1", "2", "3")),
"field3" -> sc.parallelize(Seq("Q", "W", "E"))
)
// Use RDD.zip to zip all RDDs together, then convert to Rows
val rowRdd: RDD[Row] = rdds
.map(_._2)
.map(_.map(s => Seq(s)))
.reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map { case (l1, l2) => l1 ++ l2 })
.map(Row.fromSeq)
// Create schema using the column names:
val schema: StructType = StructType(rdds.map(_._1).map(name => StructField(name, StringType)))
// Create DataFrame:
val result: DataFrame = spark.createDataFrame(rowRdd, schema)
result.show
// +------+------+------+
// |field1|field2|field3|
// +------+------+------+
// | a| 1| Q|
// | b| 2| W|
// | c| 3| E|
// +------+------+------+