使用 XMLInputFormat 在 hadoop 中解析 xml 时不执行我的 hadoop 映射器 class

Not executing my hadoop mapper class while parsing xml in hadoop using XMLInputFormat

我是 hadoop 的新手,使用 Hadoop 2.6.0 版本并尝试解析复杂的 XML。 搜索了一段时间后,我了解到对于 XML 解析,我们需要编写自定义 InputFormat,它是 mahout 的 XMLInputFormat。 我还得到了 this example

的帮助

但是当我 运行 我的代码在 passig XMLInputformat class 之后,它不会调用我自己的 Mapper class 并且输出文件有 0 个数据如果我使用示例中给出的 XMLInputFormat。

令人惊讶的是,如果我没有将我的 XMLInputFormat class 传递给我的 JOB,那么我的映射器可以正常工作并正确提供输出。有人会在这里帮助指出我在这里缺少的东西吗?

我的作业配置 class 是:

public static void runParserJob(String inputPath, String outputPath) throws IOException {
    LOGGER.info("-----runParserJob()-----Start");
    Configuration configuration = new Configuration();         configuration.set("xmlinput.start",Constants.XML_INPUT_START_TAG_PRODUCT);
    configuration.set("xmlinput.end",Constants.XML_INPUT_END_TAG_PRODUCT);
    configuration.set("io.serializations",Constants.XML_IO_SERIALIZATIONS);

    Job job = new Job(configuration,Constants.JOB_TITLE);
    FileInputFormat.setInputPaths(job, inputPath);
    job.setJarByClass(ParserDriver.class);
    job.setMapperClass(XMLMapper.class);
    job.setNumReduceTasks(0);
    job.setInputFormatClass(XmlInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    Path hdfsOutputPath = new Path(outputPath);
    FileOutputFormat.setOutputPath(job, hdfsOutputPath);
    FileSystem dfs = FileSystem.get(hdfsOutputPath.toUri(),configuration);
    /**Using this condition it will create output at same location 
     * by deleting older data in that location**/
    if(dfs.exists(hdfsOutputPath)){
        dfs.delete(hdfsOutputPath,true);
    }
    try{
        job.waitForCompletion(true);
    }catch(InterruptedException ie){
        LOGGER.error("-----Process interrupted in between Exception-----", ie);
    }catch(ClassNotFoundException ce){
        LOGGER.error("-----Class not found while running the job-----",ce);
    }
}

我的XML输入格式Class是:

public class XmlInputFormat extends TextInputFormat{

public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";

@Override
public RecordReader<LongWritable,Text> createRecordReader(InputSplit is, TaskAttemptContext tac)  {
    return new XmlRecordReader();
}

public static class XmlRecordReader extends RecordReader<LongWritable, Text>{
    private byte[] startTag;
    private byte[] endTag;
    private long start;
    private long end;
    private FSDataInputStream fsin;
    private DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable key = new LongWritable();
    private Text value = new Text();

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
            throws IOException, InterruptedException {
        FileSplit fileSplit = (FileSplit)inputSplit;
        startTag = taskAttemptContext.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
        endTag = taskAttemptContext.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");

        start = fileSplit.getStart();
        end = start + fileSplit.getLength();
        Path file = fileSplit.getPath();

        FileSystem hdfs = file.getFileSystem(taskAttemptContext.getConfiguration());
         fsin = hdfs.open(fileSplit.getPath());
         fsin.seek(start);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(fsin.getPos() < end){
            if(readUntilMatch(startTag,false)){
              try {
                    buffer.write(startTag);
                    if (readUntilMatch(endTag, true)) {
                        value.set(buffer.getData(), 0, buffer.getLength());
                        key.set(fsin.getPos());
                        return true;
                    }
                  } finally {
                    buffer.reset();
                  }
            }
        }
        return false;
    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public LongWritable getCurrentKey() throws IOException,InterruptedException {
        return null;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return null;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException{
        int i = 0;
        while(true){
            int b = fsin.read();
            //If reaches to EOF
            if(b == -1){
                return false;
            }   
            //If not then save into the buffer.
            if(withinBlock){
                buffer.write(b);
            }
            // check if we're matching:
            if (b == match[i]) {
              i++;
              if (i >= match.length) return true;
            } else i = 0;
            // see if we've passed the stop point:
            if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
        }
    }

}

}


有人可以帮我吗?提前致谢。如果我哪里出错了,请纠正我。

我不确定您的 XML 结构是什么样的,但是例如,如果您有一个 XML 结构:

<data>
   <product id="101" itemCategory="BER" transaction="PUR">
       <transaction-id>102A5RET</transaction-id>
       <item-name>Blue-Moon-12-PK-BTTLE</item-name>
       <item-purchased>2</item-purchased>
       <item-price>12.99</item-price>
       <time-stamp>2015-04-20 11:12:13 102301</time-stamp>
   </product>
   .
   .
   .
</data>

您的 XMLInputFormat class 需要知道您想使用哪个 XML 节点:

configuration.set("xmlinput.start", "<product") //note only <product
configuration.set("xmlinput.end", "</product>") //note only </product>

希望这会有所帮助!!