RDD[(String,Iterable[GenericData.Record])] 到 Map[(String,RDD[GenericData.Record])]

RDD[(String,Iterable[GenericData.Record])] to Map[(String,RDD[GenericData.Record])]

我有一个 RDD 类型的 (String,Iterable[GenericData.Record])。现在我想根据这个 RDD 的键将这些可迭代对象保存到路径中。因此,例如,如果 RDD 包含

("a",[1,2,3,4])
("b",[5,6,7,9])

我需要在result-path/a下坚持[1,2,3,4],在result-path/b下坚持[5,6,7,8,9]。执行此操作的一种方法 - 编译但在运行时失败 - 如下:

implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:RDD[(String,Iterable[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)

re.forearch {
   case (key,collection) =>
       val reRDD = sc.makeRDD(collection)
       reRDD.saveAsNewAPIHadoopFile(s"$uri/$key",
        classOf[SpecificRecord],
        classOf[NullWritable],
        classOf[AvroKeyOutputFormat[SpecificRecord]],
        hadoopConf)

}

这里的问题是我无法执行此操作,因为 SparkContext 不可序列化。re 是一个 RDD,因此对其调用 foreach 必须序列化内部 lambda 并发送到工作节点。所以我试图想办法将初始 re 转换为 Map[(String,RDD[GenericData.Record])] 以便我可以执行以下操作:

implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:Map[(String,RDD[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)

re.forearch {
   case (key,rddCollection) =>
       rddCollection.saveAsNewAPIHadoopFile(s"$uri/$key",
        classOf[SpecificRecord],
        classOf[NullWritable],
        classOf[AvroKeyOutputFormat[SpecificRecord]],
        hadoopConf)

}

可以收集键,并为每个键过滤原始RDD:

val re = rdd
  .keys
  .collect()
  .map(v => v -> rdd.filter(_._1 == v).values)
  .toMap