Hadoop 可写 readFields EOFException
Hadoop writable readFields EOFException
我正在为 Hadoop 二级排序实现我自己的 Writable,但是当 运行 作业时,Hadoop 一直在我的 readFields
方法中抛出 EOFException,我不知道它有什么问题。
错误堆栈跟踪:
java.lang.Exception: java.lang.RuntimeException: java.io.EOFException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:559)
Caused by: java.lang.RuntimeException: java.io.EOFException
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:165)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:158)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:628)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:347)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.hadoop.io.IntWritable.readFields(IntWritable.java:47)
at writable.WikiWritable.readFields(WikiWritable.java:39)
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:158)
... 12 more
我的代码:
package writable;
import org.apache.hadoop.io.*;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class WikiWritable implements WritableComparable<WikiWritable> {
private IntWritable docId;
private IntWritable position;
public WikiWritable() {
this.docId = new IntWritable();
this.position = new IntWritable();
}
public void set(String docId, int position) {
this.docId = new IntWritable(Integer.valueOf(docId));
this.position = new IntWritable(position);
}
@Override
public int compareTo(WikiWritable o) {
int result = this.docId.compareTo(o.docId);
result = result == 0 ? this.position.compareTo(o.position) : result;
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
docId.write(dataOutput);
position.write(dataOutput); // error here
}
@Override
public void readFields(DataInput dataInput) throws IOException {
docId.readFields(dataInput);
position.readFields(dataInput);
}
public IntWritable getDocId() {
return docId;
}
public int getPosition() {
return Integer.valueOf(position.toString());
}
}
// Driver
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Path wiki = new Path(args[0]);
Path out = new Path(args[1]);
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "myjob");
TextInputFormat.addInputPath(job, wiki);
TextOutputFormat.setOutputPath(job, out);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WikiWritable.class);
job.setJarByClass(Driver.class);
job.setMapperClass(WordMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(WordReducer.class);
job.setPartitionerClass(WikiPartitioner.class);
job.setGroupingComparatorClass(WikiComparator.class);
job.waitForCompletion(true);
}
}
// Mapper.map
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(",");
String id = words[0];
String[] contents = words[3].toLowerCase().replaceAll("[^a-z]+", " ").split("\s+");
for (int i = 0; i < contents.length; i++) {
String word = contents[i].trim();
word = stem(word);
WikiWritable output = new WikiWritable();
output.set(id, i);
context.write(new Text(contents[i]), output);
}
}
// Comparator
public class WikiComparator extends WritableComparator {
public WikiComparator() {
super(WikiWritable.class, true);
}
@Override
public int compare(WritableComparable wc1, WritableComparable wc2) {
WikiWritable w1 = (WikiWritable) wc1;
WikiWritable w2 = (WikiWritable) wc2;
return w1.compareTo(w2);
}
}
// Partitioner
public class WikiPartitioner extends Partitioner<WikiWritable, Text> {
@Override
public int getPartition(WikiWritable wikiWritable, Text text, int i) {
return Math.abs(wikiWritable.getDocId().hashCode() % i);
}
}
// Reducer
public class WordReducer extends Reducer<Text, WikiWritable, Text, Text> {
@Override
protected void reduce(Text key, Iterable<WikiWritable> values, Context ctx) throws IOException, InterruptedException {
Map<String, StringBuilder> map = new HashMap<>();
for (WikiWritable w : values) {
String id = String.valueOf(w.getDocId());
if (map.containsKey(id)) {
map.get(id).append(w.getPosition()).append(".");
} else {
map.put(id, new StringBuilder());
map.get(id).append(".").append(w.getPosition()).append(".");
}
}
StringBuilder builder = new StringBuilder();
map.keySet().forEach((k) -> {
map.get(k).deleteCharAt(map.get(k).length() - 1);
builder.append(k).append(map.get(k)).append(";");
});
ctx.write(key, new Text(builder.toString()));
}
}
构造新的WikiWritable
时,mapper先调用new WikiWritable()
,再调用set(...)
。
我尝试将 docId
和 position
更改为 String 和 Integer 并使用 dataOutput.read()
(我忘记了确切的方法名称,但它是类似的东西)但仍然不起作用。
TLDR:您只需要完全删除您的 WikiComparator
,根本不要调用 job.setGroupingComparatorClass
。
说明:
组比较器用于比较映射输出键,而不是映射输出值。您的地图输出键是 Text
个对象,值是 WikiWritable
个对象。
这意味着传递给比较器进行反序列化的字节表示序列化的 Text
对象。但是,WikiComparator
使用反射创建 WikiWritable
对象(如其构造函数中所指示),然后尝试使用 WikiWritable.readFields
方法反序列化 Text
对象。这显然会导致错误的阅读,从而导致您看到的异常。
就是说,我相信您根本不需要比较器,因为默认的 WritableComparator
与您的完全相同:为对象对调用 compareTo
方法传递给它。
编辑:被调用的 compareTo
方法比较的是你的键,而不是你的值,所以它比较的是 Text
对象。如果您想对 WikiWritable
进行比较和排序,您应该考虑将它们添加到复合键中。有很多关于复合键和二次排序的教程。
我正在为 Hadoop 二级排序实现我自己的 Writable,但是当 运行 作业时,Hadoop 一直在我的 readFields
方法中抛出 EOFException,我不知道它有什么问题。
错误堆栈跟踪:
java.lang.Exception: java.lang.RuntimeException: java.io.EOFException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:559)
Caused by: java.lang.RuntimeException: java.io.EOFException
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:165)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:158)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:628)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:347)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.hadoop.io.IntWritable.readFields(IntWritable.java:47)
at writable.WikiWritable.readFields(WikiWritable.java:39)
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:158)
... 12 more
我的代码:
package writable;
import org.apache.hadoop.io.*;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class WikiWritable implements WritableComparable<WikiWritable> {
private IntWritable docId;
private IntWritable position;
public WikiWritable() {
this.docId = new IntWritable();
this.position = new IntWritable();
}
public void set(String docId, int position) {
this.docId = new IntWritable(Integer.valueOf(docId));
this.position = new IntWritable(position);
}
@Override
public int compareTo(WikiWritable o) {
int result = this.docId.compareTo(o.docId);
result = result == 0 ? this.position.compareTo(o.position) : result;
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
docId.write(dataOutput);
position.write(dataOutput); // error here
}
@Override
public void readFields(DataInput dataInput) throws IOException {
docId.readFields(dataInput);
position.readFields(dataInput);
}
public IntWritable getDocId() {
return docId;
}
public int getPosition() {
return Integer.valueOf(position.toString());
}
}
// Driver
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Path wiki = new Path(args[0]);
Path out = new Path(args[1]);
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "myjob");
TextInputFormat.addInputPath(job, wiki);
TextOutputFormat.setOutputPath(job, out);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WikiWritable.class);
job.setJarByClass(Driver.class);
job.setMapperClass(WordMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(WordReducer.class);
job.setPartitionerClass(WikiPartitioner.class);
job.setGroupingComparatorClass(WikiComparator.class);
job.waitForCompletion(true);
}
}
// Mapper.map
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(",");
String id = words[0];
String[] contents = words[3].toLowerCase().replaceAll("[^a-z]+", " ").split("\s+");
for (int i = 0; i < contents.length; i++) {
String word = contents[i].trim();
word = stem(word);
WikiWritable output = new WikiWritable();
output.set(id, i);
context.write(new Text(contents[i]), output);
}
}
// Comparator
public class WikiComparator extends WritableComparator {
public WikiComparator() {
super(WikiWritable.class, true);
}
@Override
public int compare(WritableComparable wc1, WritableComparable wc2) {
WikiWritable w1 = (WikiWritable) wc1;
WikiWritable w2 = (WikiWritable) wc2;
return w1.compareTo(w2);
}
}
// Partitioner
public class WikiPartitioner extends Partitioner<WikiWritable, Text> {
@Override
public int getPartition(WikiWritable wikiWritable, Text text, int i) {
return Math.abs(wikiWritable.getDocId().hashCode() % i);
}
}
// Reducer
public class WordReducer extends Reducer<Text, WikiWritable, Text, Text> {
@Override
protected void reduce(Text key, Iterable<WikiWritable> values, Context ctx) throws IOException, InterruptedException {
Map<String, StringBuilder> map = new HashMap<>();
for (WikiWritable w : values) {
String id = String.valueOf(w.getDocId());
if (map.containsKey(id)) {
map.get(id).append(w.getPosition()).append(".");
} else {
map.put(id, new StringBuilder());
map.get(id).append(".").append(w.getPosition()).append(".");
}
}
StringBuilder builder = new StringBuilder();
map.keySet().forEach((k) -> {
map.get(k).deleteCharAt(map.get(k).length() - 1);
builder.append(k).append(map.get(k)).append(";");
});
ctx.write(key, new Text(builder.toString()));
}
}
构造新的WikiWritable
时,mapper先调用new WikiWritable()
,再调用set(...)
。
我尝试将 docId
和 position
更改为 String 和 Integer 并使用 dataOutput.read()
(我忘记了确切的方法名称,但它是类似的东西)但仍然不起作用。
TLDR:您只需要完全删除您的 WikiComparator
,根本不要调用 job.setGroupingComparatorClass
。
说明:
组比较器用于比较映射输出键,而不是映射输出值。您的地图输出键是 Text
个对象,值是 WikiWritable
个对象。
这意味着传递给比较器进行反序列化的字节表示序列化的 Text
对象。但是,WikiComparator
使用反射创建 WikiWritable
对象(如其构造函数中所指示),然后尝试使用 WikiWritable.readFields
方法反序列化 Text
对象。这显然会导致错误的阅读,从而导致您看到的异常。
就是说,我相信您根本不需要比较器,因为默认的 WritableComparator
与您的完全相同:为对象对调用 compareTo
方法传递给它。
编辑:被调用的 compareTo
方法比较的是你的键,而不是你的值,所以它比较的是 Text
对象。如果您想对 WikiWritable
进行比较和排序,您应该考虑将它们添加到复合键中。有很多关于复合键和二次排序的教程。