为什么 SequenceFile 被截断了?
Why the SequenceFile is truncated?
我正在学习Hadoop
,这个问题困扰了我一段时间。基本上我是将 SequenceFile
写入磁盘然后读回。但是,每次阅读时我都会得到一个EOFException
。仔细一看,写序列文件的时候,过早截断了,而且总是在写索引962之后发生,而且文件总是固定大小45056字节。
我在 MacBook Pro 上使用 Java 8 和 Hadoop 2.5.1。事实上,我在 Java 7 下的另一台 Linux 机器上尝试了相同的代码,但同样的事情发生了。
我可以排除writer/reader没有正确关闭。如代码所示,我尝试使用带有显式 writer.close() 的旧样式 try/catch,并且还使用较新的 try-with-resource 方法。两者都不起作用。
任何帮助将不胜感激。
以下是我使用的代码:
public class SequenceFileDemo {
private static final String[] DATA = { "One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen" };
public static void main(String[] args) throws Exception {
String uri = "file:///Users/andy/Downloads/puzzling.seq";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
//API change
try {
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
stream(fs.create(path)),
keyClass(IntWritable.class),
valueClass(Text.class));
for ( int i = 0; i < 1024; i++ ) {
key.set( i);
value.clear();
value.set(DATA[i % DATA.length]);
writer.append(key, value);
if ( (i-1) %100 == 0 ) writer.hflush();
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
}
writer.close();
} catch (Exception e ) {
e.printStackTrace();
}
try {
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(path));
Class<?> keyClass = reader.getKeyClass();
Class<?> valueClass = reader.getValueClass();
boolean isWritableSerilization = false;
try {
keyClass.asSubclass(WritableComparable.class);
isWritableSerilization = true;
} catch (ClassCastException e) {
}
if ( isWritableSerilization ) {
WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
while(reader.next(rKey, rValue)) {
System.out.printf("[%s] %d %s=%s\n",reader.syncSeen(), reader.getPosition(), rKey, rValue);
}
} else {
//make sure io.seraizliatons has the serialization in use when write the sequence file
}
reader.close();
} catch(IOException e) {
e.printStackTrace();
}
}
}
我认为您在写入循环后缺少 writer.close()。这应该保证在你开始阅读之前最后冲洗一下。
我确实发现了错误,那是因为你从来没有关闭在 Writer.stream(fs.create(path))
中创建的流。
出于某种原因,关闭不会向下传播到您刚刚在那里创建的流。我想这是一个错误,但我暂时懒得在 Jira 中查找它。
解决问题的一种方法是简单地使用 Writer.file(path)
。
显然,您也可以直接关闭创建流。在下面找到我更正的示例:
Path path = new Path("file:///tmp/puzzling.seq");
try (FSDataOutputStream stream = fs.create(path)) {
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.stream(stream),
Writer.keyClass(IntWritable.class), Writer.valueClass(NullWritable.class))) {
for (int i = 0; i < 1024; i++) {
writer.append(new IntWritable(i), NullWritable.get());
}
}
}
try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path))) {
Class<?> keyClass = reader.getKeyClass();
Class<?> valueClass = reader.getValueClass();
WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
while (reader.next(rKey, rValue)) {
System.out.printf("%s = %s\n", rKey, rValue);
}
}
感谢托马斯。
归根结底是作者是否创建了"owns"不是的流。创建writer时,如果传入选项Writer.file(path),writer "owns"内部创建的底层流,close( ) 叫做。然而,如果我们传入 Writer.stream(aStream),作者会假设其他人是该流的响应,并且不会在调用 close() 时关闭它。简而言之,这不是错误,只是我对它的了解不够深入。 .
我正在学习Hadoop
,这个问题困扰了我一段时间。基本上我是将 SequenceFile
写入磁盘然后读回。但是,每次阅读时我都会得到一个EOFException
。仔细一看,写序列文件的时候,过早截断了,而且总是在写索引962之后发生,而且文件总是固定大小45056字节。
我在 MacBook Pro 上使用 Java 8 和 Hadoop 2.5.1。事实上,我在 Java 7 下的另一台 Linux 机器上尝试了相同的代码,但同样的事情发生了。
我可以排除writer/reader没有正确关闭。如代码所示,我尝试使用带有显式 writer.close() 的旧样式 try/catch,并且还使用较新的 try-with-resource 方法。两者都不起作用。
任何帮助将不胜感激。
以下是我使用的代码:
public class SequenceFileDemo {
private static final String[] DATA = { "One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen" };
public static void main(String[] args) throws Exception {
String uri = "file:///Users/andy/Downloads/puzzling.seq";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
//API change
try {
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
stream(fs.create(path)),
keyClass(IntWritable.class),
valueClass(Text.class));
for ( int i = 0; i < 1024; i++ ) {
key.set( i);
value.clear();
value.set(DATA[i % DATA.length]);
writer.append(key, value);
if ( (i-1) %100 == 0 ) writer.hflush();
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
}
writer.close();
} catch (Exception e ) {
e.printStackTrace();
}
try {
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(path));
Class<?> keyClass = reader.getKeyClass();
Class<?> valueClass = reader.getValueClass();
boolean isWritableSerilization = false;
try {
keyClass.asSubclass(WritableComparable.class);
isWritableSerilization = true;
} catch (ClassCastException e) {
}
if ( isWritableSerilization ) {
WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
while(reader.next(rKey, rValue)) {
System.out.printf("[%s] %d %s=%s\n",reader.syncSeen(), reader.getPosition(), rKey, rValue);
}
} else {
//make sure io.seraizliatons has the serialization in use when write the sequence file
}
reader.close();
} catch(IOException e) {
e.printStackTrace();
}
}
}
我认为您在写入循环后缺少 writer.close()。这应该保证在你开始阅读之前最后冲洗一下。
我确实发现了错误,那是因为你从来没有关闭在 Writer.stream(fs.create(path))
中创建的流。
出于某种原因,关闭不会向下传播到您刚刚在那里创建的流。我想这是一个错误,但我暂时懒得在 Jira 中查找它。
解决问题的一种方法是简单地使用 Writer.file(path)
。
显然,您也可以直接关闭创建流。在下面找到我更正的示例:
Path path = new Path("file:///tmp/puzzling.seq");
try (FSDataOutputStream stream = fs.create(path)) {
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.stream(stream),
Writer.keyClass(IntWritable.class), Writer.valueClass(NullWritable.class))) {
for (int i = 0; i < 1024; i++) {
writer.append(new IntWritable(i), NullWritable.get());
}
}
}
try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path))) {
Class<?> keyClass = reader.getKeyClass();
Class<?> valueClass = reader.getValueClass();
WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
while (reader.next(rKey, rValue)) {
System.out.printf("%s = %s\n", rKey, rValue);
}
}
感谢托马斯。
归根结底是作者是否创建了"owns"不是的流。创建writer时,如果传入选项Writer.file(path),writer "owns"内部创建的底层流,close( ) 叫做。然而,如果我们传入 Writer.stream(aStream),作者会假设其他人是该流的响应,并且不会在调用 close() 时关闭它。简而言之,这不是错误,只是我对它的了解不够深入。 .