如何使用 StreamXmlRecordReader 解析单个文件中的单行和多行 xml 记录

How to use StreamXmlRecordReader to parse single & multiline xml records within a single file

我有一个输入文件 (txt) 如下

<a><b><c>val1</c></b></a>||<a><b><c>val2</c></b></a>||<a><b>
<c>val3</c></b></a>||<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

仔细观察输入,第三个'||'后的xml数据记录分为两行。

我想用hadoop streaming的StreamXmlRecordReader来解析这个文件

-inputreader "org.apache.hadoop.streaming.StreamXmlRecordReader,begin=<a>,end=</a>,slowmatch=true

我无法解析第 3 条记录。

我收到以下错误

Traceback (most recent call last):
  File "/home/rsome/test/code/m1.py", line 13, in <module>
    root = ET.fromstring(xml_str.getvalue())
  File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 964, in XML
    return parser.close()
  File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 1254, in close
    self._parser.Parse("", 1) # end of data
xml.parsers.expat.ExpatError: no element found: line 1, column 18478

我也用过slowmatch=true,但还是不行。

我的输出如下

$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b>
rec::4::mapper1
<c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

我的预期输出是

$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b><c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

在这方面的任何帮助都会有很大帮助

基本上,作为 hadoop-streaming 的默认输入格式的 StreamXmlInputFormat 扩展了 KeyValueTextInputFormat,它将在换行符 (\r\n) 处拆分行,这在我的记录被拆分为多行的情况下是不期望的.

因此,为了克服这个问题,我实现了我自己的输入格式扩展 FileInputFormat,我可以自由地进一步查看我的 endTag 的新行字符 (\r\n)。

用法:

-libjars /path/to/custom-xml-input-format-1.0.0.jar
-D xmlinput.start="<a>" \
-D xmlinput.end="</a>" \    
-inputformat "my.package.CustomXmlInputFormat"

这是我使用的代码。

import java.io.*;
import java.lang.reflect.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.streaming.*;


public class CustomXmlInputFormat extends FileInputFormat {

  public static final String START_TAG_KEY = "xmlinput.start";
  public static final String END_TAG_KEY = "xmlinput.end";

  @SuppressWarnings("unchecked")
  @Override
  public RecordReader<LongWritable, Text> getRecordReader(final InputSplit genericSplit,
                                      JobConf job, Reporter reporter) throws IOException {
      return new XmlRecordReader((FileSplit) genericSplit, job, reporter);
  }


  public static class XmlRecordReader implements RecordReader<LongWritable, Text> {

    private final byte[] endTag;
    private final byte[] startTag;
    private final long start;
    private final long end;
    private final FSDataInputStream fsin;
    private final DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable currentKey;
    private Text currentValue;

    public XmlRecordReader(FileSplit split, JobConf conf, Reporter reporter) throws IOException {
      startTag = conf.get(START_TAG_KEY).getBytes("UTF-8");
      endTag = conf.get(END_TAG_KEY).getBytes("UTF-8");

      start = split.getStart();
      end = start + split.getLength();
      Path file = split.getPath();
      FileSystem fs = file.getFileSystem(conf);
      fsin = fs.open(split.getPath());
      fsin.seek(start);
    }


    public boolean next(LongWritable key, Text value) throws IOException {
      if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
        try {
          buffer.write(startTag);
          if (readUntilMatch(endTag, true)) {
            key.set(fsin.getPos());
            value.set(buffer.getData(), 0, buffer.getLength());
            return true;
          }
        } finally {
          buffer.reset();
        }
      }
      return false;
    }

    public boolean readUntilMatch(byte[] match, boolean withinBlock)
        throws IOException {
      int i = 0;
      while (true) {
        int b = fsin.read();
        if (b == -1) {
          return false;
        }

        if (withinBlock && b != (byte) '\r' && b != (byte) '\n') {
          buffer.write(b);
        }

        if (b == match[i]) {
          i++;
          if (i >= match.length) {
            return true;
          }
        } else {
          i = 0;
        }

        if (!withinBlock && i == 0 && fsin.getPos() >= end) {
          return false;
        }
      }
    }

    @Override
    public float getProgress() throws IOException {
      return (fsin.getPos() - start) / (float) (end - start);
    }

    @Override
    public synchronized long getPos() throws IOException {
        return fsin.getPos();
    }

    @Override
    public LongWritable createKey() {
      return new LongWritable();
    }

    @Override
    public Text createValue() {
      return new Text();
    }

    @Override
    public synchronized void close() throws IOException {
        fsin.close();
    }

  }
}

这是我的输出

$ hdfs dfs -text /poc/testout001/part-*
25      <a><b><c>val1</c></b></a>
52      <a><b><c>val2</c></b></a>
80      <a><b><c>val3</c></b></a>
141     <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>