如何使用 StreamXmlRecordReader 解析单个文件中的单行和多行 xml 记录
How to use StreamXmlRecordReader to parse single & multiline xml records within a single file
我有一个输入文件 (txt) 如下
<a><b><c>val1</c></b></a>||<a><b><c>val2</c></b></a>||<a><b>
<c>val3</c></b></a>||<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
仔细观察输入,第三个'||'后的xml数据记录分为两行。
我想用hadoop streaming的StreamXmlRecordReader来解析这个文件
-inputreader "org.apache.hadoop.streaming.StreamXmlRecordReader,begin=<a>,end=</a>,slowmatch=true
我无法解析第 3 条记录。
我收到以下错误
Traceback (most recent call last):
File "/home/rsome/test/code/m1.py", line 13, in <module>
root = ET.fromstring(xml_str.getvalue())
File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 964, in XML
return parser.close()
File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 1254, in close
self._parser.Parse("", 1) # end of data
xml.parsers.expat.ExpatError: no element found: line 1, column 18478
我也用过slowmatch=true,但还是不行。
我的输出如下
$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b>
rec::4::mapper1
<c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
我的预期输出是
$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b><c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
在这方面的任何帮助都会有很大帮助
基本上,作为 hadoop-streaming 的默认输入格式的 StreamXmlInputFormat 扩展了 KeyValueTextInputFormat,它将在换行符 (\r\n) 处拆分行,这在我的记录被拆分为多行的情况下是不期望的.
因此,为了克服这个问题,我实现了我自己的输入格式扩展 FileInputFormat,我可以自由地进一步查看我的 endTag 的新行字符 (\r\n)。
用法:
-libjars /path/to/custom-xml-input-format-1.0.0.jar
-D xmlinput.start="<a>" \
-D xmlinput.end="</a>" \
-inputformat "my.package.CustomXmlInputFormat"
这是我使用的代码。
import java.io.*;
import java.lang.reflect.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.streaming.*;
public class CustomXmlInputFormat extends FileInputFormat {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
@SuppressWarnings("unchecked")
@Override
public RecordReader<LongWritable, Text> getRecordReader(final InputSplit genericSplit,
JobConf job, Reporter reporter) throws IOException {
return new XmlRecordReader((FileSplit) genericSplit, job, reporter);
}
public static class XmlRecordReader implements RecordReader<LongWritable, Text> {
private final byte[] endTag;
private final byte[] startTag;
private final long start;
private final long end;
private final FSDataInputStream fsin;
private final DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable currentKey;
private Text currentValue;
public XmlRecordReader(FileSplit split, JobConf conf, Reporter reporter) throws IOException {
startTag = conf.get(START_TAG_KEY).getBytes("UTF-8");
endTag = conf.get(END_TAG_KEY).getBytes("UTF-8");
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(split.getPath());
fsin.seek(start);
}
public boolean next(LongWritable key, Text value) throws IOException {
if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
return false;
}
public boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
if (b == -1) {
return false;
}
if (withinBlock && b != (byte) '\r' && b != (byte) '\n') {
buffer.write(b);
}
if (b == match[i]) {
i++;
if (i >= match.length) {
return true;
}
} else {
i = 0;
}
if (!withinBlock && i == 0 && fsin.getPos() >= end) {
return false;
}
}
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}
@Override
public synchronized long getPos() throws IOException {
return fsin.getPos();
}
@Override
public LongWritable createKey() {
return new LongWritable();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public synchronized void close() throws IOException {
fsin.close();
}
}
}
这是我的输出
$ hdfs dfs -text /poc/testout001/part-*
25 <a><b><c>val1</c></b></a>
52 <a><b><c>val2</c></b></a>
80 <a><b><c>val3</c></b></a>
141 <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
我有一个输入文件 (txt) 如下
<a><b><c>val1</c></b></a>||<a><b><c>val2</c></b></a>||<a><b>
<c>val3</c></b></a>||<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
仔细观察输入,第三个'||'后的xml数据记录分为两行。
我想用hadoop streaming的StreamXmlRecordReader来解析这个文件
-inputreader "org.apache.hadoop.streaming.StreamXmlRecordReader,begin=<a>,end=</a>,slowmatch=true
我无法解析第 3 条记录。
我收到以下错误
Traceback (most recent call last):
File "/home/rsome/test/code/m1.py", line 13, in <module>
root = ET.fromstring(xml_str.getvalue())
File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 964, in XML
return parser.close()
File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 1254, in close
self._parser.Parse("", 1) # end of data
xml.parsers.expat.ExpatError: no element found: line 1, column 18478
我也用过slowmatch=true,但还是不行。
我的输出如下
$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b>
rec::4::mapper1
<c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
我的预期输出是
$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b><c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>
在这方面的任何帮助都会有很大帮助
基本上,作为 hadoop-streaming 的默认输入格式的 StreamXmlInputFormat 扩展了 KeyValueTextInputFormat,它将在换行符 (\r\n) 处拆分行,这在我的记录被拆分为多行的情况下是不期望的.
因此,为了克服这个问题,我实现了我自己的输入格式扩展 FileInputFormat,我可以自由地进一步查看我的 endTag 的新行字符 (\r\n)。
用法:
-libjars /path/to/custom-xml-input-format-1.0.0.jar
-D xmlinput.start="<a>" \
-D xmlinput.end="</a>" \
-inputformat "my.package.CustomXmlInputFormat"
这是我使用的代码。
import java.io.*;
import java.lang.reflect.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.streaming.*;
public class CustomXmlInputFormat extends FileInputFormat {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
@SuppressWarnings("unchecked")
@Override
public RecordReader<LongWritable, Text> getRecordReader(final InputSplit genericSplit,
JobConf job, Reporter reporter) throws IOException {
return new XmlRecordReader((FileSplit) genericSplit, job, reporter);
}
public static class XmlRecordReader implements RecordReader<LongWritable, Text> {
private final byte[] endTag;
private final byte[] startTag;
private final long start;
private final long end;
private final FSDataInputStream fsin;
private final DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable currentKey;
private Text currentValue;
public XmlRecordReader(FileSplit split, JobConf conf, Reporter reporter) throws IOException {
startTag = conf.get(START_TAG_KEY).getBytes("UTF-8");
endTag = conf.get(END_TAG_KEY).getBytes("UTF-8");
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(split.getPath());
fsin.seek(start);
}
public boolean next(LongWritable key, Text value) throws IOException {
if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
return false;
}
public boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
if (b == -1) {
return false;
}
if (withinBlock && b != (byte) '\r' && b != (byte) '\n') {
buffer.write(b);
}
if (b == match[i]) {
i++;
if (i >= match.length) {
return true;
}
} else {
i = 0;
}
if (!withinBlock && i == 0 && fsin.getPos() >= end) {
return false;
}
}
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}
@Override
public synchronized long getPos() throws IOException {
return fsin.getPos();
}
@Override
public LongWritable createKey() {
return new LongWritable();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public synchronized void close() throws IOException {
fsin.close();
}
}
}
这是我的输出
$ hdfs dfs -text /poc/testout001/part-*
25 <a><b><c>val1</c></b></a>
52 <a><b><c>val2</c></b></a>
80 <a><b><c>val3</c></b></a>
141 <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>