合并不会减少我的输出文件数量

coalesce does not reduce my number of output files

我在 HDFS.

上有一个管理 RDD[SpecificRecordBase] 的 spark 作业

我的问题是它生成了很多文件,包括 95% 的空 avro 文件。 我尝试使用 coalesce 来减少我的 RDD 上的分区数量,以及我的输出文件的数量,但它没有任何效果。

 def write(data: RDD[SpecificRecordBase]) = {
   data.coalesce(1, false)    //has no effect
   val conf = new Configuration()
   val job = new org.apache.hadoop.mapreduce.Job(conf)

   AvroJob.setOutputKeySchema(job, schema)
   val pair = new PairRDDFunctions(rdd)
   pair.saveAsNewAPIHadoopFile(
     outputAvroDataPath,
     classOf[AvroKey[SpecificRecordBase]],
     classOf[org.apache.hadoop.io.NullWritable],
     classOf[AvroKeyOutputFormat[SpecificRecordBase]],
     job.getConfiguration)
}

我想 rdd 分区配置和 HDFS 分区之间丢失了一些东西,也许 saveAsNewAPIHadoopFile 没有考虑到它,但我不确定。

我错过了什么吗?

有人可以根据 rdd 分区解释调用 saveAsNewAPIHadoopFile 时真正追加的内容吗?

感谢@0x0FFF 回答我自己的问题,正确的代码应该是:

    def write(data: RDD[SpecificRecordBase]) = {
           val rdd = data.map(t => (new AvroKey(t), org.apache.hadoop.io.NullWritable.get))
           val rdd1Partition = rdd.coalesce(1, false)  //change nb of partitions to 1

           val conf = new Configuration()
           val job = new org.apache.hadoop.mapreduce.Job(conf)

           AvroJob.setOutputKeySchema(job, schema)
           val pair = new PairRDDFunctions(rdd1Partition) //so only one file will be in output
           pair.saveAsNewAPIHadoopFile(
             outputAvroDataPath,
             classOf[AvroKey[SpecificRecordBase]],
             classOf[org.apache.hadoop.io.NullWritable],
             classOf[AvroKeyOutputFormat[SpecificRecordBase]],
             job.getConfiguration)
        }

再次感谢!