合并不会减少我的输出文件数量
coalesce does not reduce my number of output files
我在 HDFS
.
上有一个管理 RDD[SpecificRecordBase]
的 spark 作业
我的问题是它生成了很多文件,包括 95% 的空 avro 文件。
我尝试使用 coalesce 来减少我的 RDD 上的分区数量,以及我的输出文件的数量,但它没有任何效果。
def write(data: RDD[SpecificRecordBase]) = {
data.coalesce(1, false) //has no effect
val conf = new Configuration()
val job = new org.apache.hadoop.mapreduce.Job(conf)
AvroJob.setOutputKeySchema(job, schema)
val pair = new PairRDDFunctions(rdd)
pair.saveAsNewAPIHadoopFile(
outputAvroDataPath,
classOf[AvroKey[SpecificRecordBase]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecordBase]],
job.getConfiguration)
}
我想 rdd
分区配置和 HDFS
分区之间丢失了一些东西,也许 saveAsNewAPIHadoopFile
没有考虑到它,但我不确定。
我错过了什么吗?
有人可以根据 rdd 分区解释调用 saveAsNewAPIHadoopFile
时真正追加的内容吗?
感谢@0x0FFF 回答我自己的问题,正确的代码应该是:
def write(data: RDD[SpecificRecordBase]) = {
val rdd = data.map(t => (new AvroKey(t), org.apache.hadoop.io.NullWritable.get))
val rdd1Partition = rdd.coalesce(1, false) //change nb of partitions to 1
val conf = new Configuration()
val job = new org.apache.hadoop.mapreduce.Job(conf)
AvroJob.setOutputKeySchema(job, schema)
val pair = new PairRDDFunctions(rdd1Partition) //so only one file will be in output
pair.saveAsNewAPIHadoopFile(
outputAvroDataPath,
classOf[AvroKey[SpecificRecordBase]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecordBase]],
job.getConfiguration)
}
再次感谢!
我在 HDFS
.
RDD[SpecificRecordBase]
的 spark 作业
我的问题是它生成了很多文件,包括 95% 的空 avro 文件。 我尝试使用 coalesce 来减少我的 RDD 上的分区数量,以及我的输出文件的数量,但它没有任何效果。
def write(data: RDD[SpecificRecordBase]) = {
data.coalesce(1, false) //has no effect
val conf = new Configuration()
val job = new org.apache.hadoop.mapreduce.Job(conf)
AvroJob.setOutputKeySchema(job, schema)
val pair = new PairRDDFunctions(rdd)
pair.saveAsNewAPIHadoopFile(
outputAvroDataPath,
classOf[AvroKey[SpecificRecordBase]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecordBase]],
job.getConfiguration)
}
我想 rdd
分区配置和 HDFS
分区之间丢失了一些东西,也许 saveAsNewAPIHadoopFile
没有考虑到它,但我不确定。
我错过了什么吗?
有人可以根据 rdd 分区解释调用 saveAsNewAPIHadoopFile
时真正追加的内容吗?
感谢@0x0FFF 回答我自己的问题,正确的代码应该是:
def write(data: RDD[SpecificRecordBase]) = {
val rdd = data.map(t => (new AvroKey(t), org.apache.hadoop.io.NullWritable.get))
val rdd1Partition = rdd.coalesce(1, false) //change nb of partitions to 1
val conf = new Configuration()
val job = new org.apache.hadoop.mapreduce.Job(conf)
AvroJob.setOutputKeySchema(job, schema)
val pair = new PairRDDFunctions(rdd1Partition) //so only one file will be in output
pair.saveAsNewAPIHadoopFile(
outputAvroDataPath,
classOf[AvroKey[SpecificRecordBase]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecordBase]],
job.getConfiguration)
}
再次感谢!