RDD[(String,Iterable[GenericData.Record])] 到 Map[(String,RDD[GenericData.Record])]
RDD[(String,Iterable[GenericData.Record])] to Map[(String,RDD[GenericData.Record])]
我有一个 RDD
类型的 (String,Iterable[GenericData.Record])
。现在我想根据这个 RDD 的键将这些可迭代对象保存到路径中。因此,例如,如果 RDD 包含
("a",[1,2,3,4])
("b",[5,6,7,9])
我需要在result-path/a
下坚持[1,2,3,4],在result-path/b
下坚持[5,6,7,8,9]。执行此操作的一种方法 - 编译但在运行时失败 - 如下:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:RDD[(String,Iterable[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,collection) =>
val reRDD = sc.makeRDD(collection)
reRDD.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
这里的问题是我无法执行此操作,因为 SparkContext 不可序列化。re
是一个 RDD
,因此对其调用 foreach
必须序列化内部 lambda 并发送到工作节点。所以我试图想办法将初始 re
转换为 Map[(String,RDD[GenericData.Record])]
以便我可以执行以下操作:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:Map[(String,RDD[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,rddCollection) =>
rddCollection.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
可以收集键,并为每个键过滤原始RDD:
val re = rdd
.keys
.collect()
.map(v => v -> rdd.filter(_._1 == v).values)
.toMap
我有一个 RDD
类型的 (String,Iterable[GenericData.Record])
。现在我想根据这个 RDD 的键将这些可迭代对象保存到路径中。因此,例如,如果 RDD 包含
("a",[1,2,3,4])
("b",[5,6,7,9])
我需要在result-path/a
下坚持[1,2,3,4],在result-path/b
下坚持[5,6,7,8,9]。执行此操作的一种方法 - 编译但在运行时失败 - 如下:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:RDD[(String,Iterable[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,collection) =>
val reRDD = sc.makeRDD(collection)
reRDD.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
这里的问题是我无法执行此操作,因为 SparkContext 不可序列化。re
是一个 RDD
,因此对其调用 foreach
必须序列化内部 lambda 并发送到工作节点。所以我试图想办法将初始 re
转换为 Map[(String,RDD[GenericData.Record])]
以便我可以执行以下操作:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:Map[(String,RDD[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,rddCollection) =>
rddCollection.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
可以收集键,并为每个键过滤原始RDD:
val re = rdd
.keys
.collect()
.map(v => v -> rdd.filter(_._1 == v).values)
.toMap