java.io.IOException: 不是数据文件

java.io.IOException: Not a data file

我正在处理一堆存储在 HDFS 嵌套目录结构中的 avro 文件。这些文件存储在 year/month/day/hour 格式的目录结构中。

我写了这个简单的代码来处理

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
val rootDir = "/user/cloudera/rootDir"
val rdd1 = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](rootDir)
rdd1.count()

我在下面粘贴了一个异常。我面临的最大问题是它没有告诉我哪个文件不是数据文件。所以我将不得不进入 HDFS 并扫描 1000 个文件以查看哪个不是数据文件。

有没有更有效的方法debug/solve?

5/11/01 19:01:49 WARN TaskSetManager: Lost task 1084.0 in stage 14.0 (TID 11562, datanode): java.io.IOException: Not a data file.
    at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:102)
    at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:97)
    at org.apache.avro.mapreduce.AvroRecordReaderBase.createAvroFileReader(AvroRecordReaderBase.java:183)
    at org.apache.avro.mapreduce.AvroRecordReaderBase.initialize(AvroRecordReaderBase.java:94)
    at org.apache.spark.rdd.NewHadoopRDD$$anon.<init>(NewHadoopRDD.scala:133)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

块所在的集群中的一个节点已关闭。因此找不到数据,这会导致错误。解决方案是修复并启动集群中的所有节点。

我的 Java 使用 avro 输入的 map reduce 程序得到了下面的确切错误。以下是问题的运行摘要。

Error: java.io.IOException: Not a data file.    at
org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:102)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:97)
at org.apache.avro.mapreduce.AvroRecordReaderBase.createAvroFileReader(AvroRecordReaderBase.java:183)   at
org.apache.avro.mapreduce.AvroRecordReaderBase.initialize(AvroRecordReaderBase.java:94) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:548)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:786)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)   at
org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)    at
 java.security.AccessController.doPrivileged(Native Method)     at javax.security.auth.Subject.doAs(Subject.java:422)   at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

我决定对该文件进行 cat,因为我能够 运行 该程序覆盖 HDFS 同一文件夹中的另一个文件并收到以下内容。

INFO hdfs.DFSClient: No node available for <Block location in your
cluster> from any node: java.io.IOException: No live nodes contain
 block BP-6168826450-10.1.10.123-1457116155679:blk_1073853378_112574
 after checking nodes = [], ignoredNodes = null No live nodes contain
 current block Block locations: Dead nodes: . Will get new block
 locations from namenode and retry...

我们的集群一直存在一些问题,不幸的是一些节点已关闭。解决问题后,此错误已解决

我在 map reduce 作业中读取 avro 文件时遇到了同样的错误。稍微调查一下,我发现 MapTasks 失败的 avro 文件都是零字节 avro 文件。看起来 MapReduce 无法处理零字节文件。

在我的例子中,我试图使用 DataFileReader 读取数据,它希望数据采用某种格式(使用 DataFileWriter 编写),但我的数据文件是手工制作的,所以我是收到此错误。

我通过使用 JsonDecoder 将模式和 Avro 记录作为参数和 returns 解码器来解决这个问题。这个解码器可以与 GenericDatumReader 一起使用来读取你的 GenericRecord。这是供您参考的 Scala 代码。

    val avroJson = Source.fromURL(getClass.getResource("/record.avro")).mkString
    val decoderFactory: DecoderFactory = new DecoderFactory
    val decoder: Decoder = decoderFactory.jsonDecoder(schema, avroJson)

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    var avroRecord: GenericRecord = datumReader.read(null, decoder)

HTH.