具有 ArrayWritable 的 Hadoop MapReduce 链
Hadoop MapReduce chain with ArrayWritable
我正在尝试创建一个由两个步骤组成的 mapreduce 链。
第一个 reduce 将键值对作为 (key, value) 发出,其中 value 是自定义对象的列表,第二个映射器应该读取第一个 reducer 的输出。
该列表是一个自定义的 ArrayWritable。这是相关代码:
自定义对象:
public class Custom implements Writable {
private Text document;
private IntWritable count;
public Custom(){
setDocument("");
setCount(0);
}
public Custom(String document, int count) {
setDocument(document);
setCount(count);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
document.readFields(in);
count.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
document.write(out);
count.write(out);
}
@Override
public String toString() {
return this.document.toString() + "\t" + this.count.toString();
}
public int getCount() {
return count.get();
}
public void setCount(int count) {
this.count = new IntWritable(count);
}
public String getDocument() {
return document.toString();
}
public void setDocument(String document) {
this.document = new Text(document);
}
}
自定义 ArrayWritable:
class MyArrayWritable extends ArrayWritable {
public MyArrayWritable(Writable[] values) {
super(Custom.class, values);
}
public MyArrayWritable() {
super(Custom.class);
}
@Override
public Custom[] get() {
return (Custom[]) super.get();
}
@Override
public String toString() {
return Arrays.toString(get());
}
@Override
public void write(DataOutput arg0) throws IOException {
super.write(arg0);
}
}
第一个减速器:
public static class NGramReducer extends Reducer<Text, Text, Text, MyArrayWritable> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//other code
context.write(key, mArrayWritable);
}
}
第二个映射器:
public static class SecondMapper extends Mapper<Text, MyArrayWritable, Text, IntWritable> {
private StringBuilder docBuilder= new StringBuilder();
public void map(Text key, MyArrayWritable value, Context context) throws IOException, InterruptedException {
//whatever code
}
}
这些是主要的设置:
//...
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(MyArrayWritable.class);
job1.setInputFormatClass(WholeFileInputFormat.class);
FileInputFormat.addInputPath(job1, new Path(args[2]));
FileOutputFormat.setOutputPath(job1, TEMP_PATH);
//...
job2.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job2, TEMP_PATH);
FileOutputFormat.setOutputPath(job2, new Path(args[3]));
当我 运行 它时,我得到这个错误
错误:java.lang.ClassCastException:org.apache.hadoop.io.Text 无法转换为 Detector$MyArrayWritable
问题是什么?我必须写一个 FileInputFormat 吗? (工作 1 工作正常)
看起来这是因为你的工作 2 InputFormat
。 KeyValueTextInputFormat.class
需要一个键和值,它们都是 Text
对象。由于您的作业1输出(Text,MyArrayWritable)
,与值有冲突。
幸运的是,您不必编写自定义 OutputFormat
来满足您的数据!只需将作业 1 数据的输出写入序列文件,从而使数据保持二进制形式:
//...
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(MyArrayWritable.class);
job1.setInputFormatClass(WholeFileInputFormat.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job1, new Path(args[2]));
SequenceFileOutputFormat.setOutputPath(job1, TEMP_PATH);
//...
job2.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.addInputPath(job2, TEMP_PATH);
FileOutputFormat.setOutputPath(job2, new Path(args[3]));
我正在尝试创建一个由两个步骤组成的 mapreduce 链。 第一个 reduce 将键值对作为 (key, value) 发出,其中 value 是自定义对象的列表,第二个映射器应该读取第一个 reducer 的输出。 该列表是一个自定义的 ArrayWritable。这是相关代码:
自定义对象:
public class Custom implements Writable {
private Text document;
private IntWritable count;
public Custom(){
setDocument("");
setCount(0);
}
public Custom(String document, int count) {
setDocument(document);
setCount(count);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
document.readFields(in);
count.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
document.write(out);
count.write(out);
}
@Override
public String toString() {
return this.document.toString() + "\t" + this.count.toString();
}
public int getCount() {
return count.get();
}
public void setCount(int count) {
this.count = new IntWritable(count);
}
public String getDocument() {
return document.toString();
}
public void setDocument(String document) {
this.document = new Text(document);
}
}
自定义 ArrayWritable:
class MyArrayWritable extends ArrayWritable {
public MyArrayWritable(Writable[] values) {
super(Custom.class, values);
}
public MyArrayWritable() {
super(Custom.class);
}
@Override
public Custom[] get() {
return (Custom[]) super.get();
}
@Override
public String toString() {
return Arrays.toString(get());
}
@Override
public void write(DataOutput arg0) throws IOException {
super.write(arg0);
}
}
第一个减速器:
public static class NGramReducer extends Reducer<Text, Text, Text, MyArrayWritable> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//other code
context.write(key, mArrayWritable);
}
}
第二个映射器:
public static class SecondMapper extends Mapper<Text, MyArrayWritable, Text, IntWritable> {
private StringBuilder docBuilder= new StringBuilder();
public void map(Text key, MyArrayWritable value, Context context) throws IOException, InterruptedException {
//whatever code
}
}
这些是主要的设置:
//...
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(MyArrayWritable.class);
job1.setInputFormatClass(WholeFileInputFormat.class);
FileInputFormat.addInputPath(job1, new Path(args[2]));
FileOutputFormat.setOutputPath(job1, TEMP_PATH);
//...
job2.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job2, TEMP_PATH);
FileOutputFormat.setOutputPath(job2, new Path(args[3]));
当我 运行 它时,我得到这个错误 错误:java.lang.ClassCastException:org.apache.hadoop.io.Text 无法转换为 Detector$MyArrayWritable
问题是什么?我必须写一个 FileInputFormat 吗? (工作 1 工作正常)
看起来这是因为你的工作 2 InputFormat
。 KeyValueTextInputFormat.class
需要一个键和值,它们都是 Text
对象。由于您的作业1输出(Text,MyArrayWritable)
,与值有冲突。
幸运的是,您不必编写自定义 OutputFormat
来满足您的数据!只需将作业 1 数据的输出写入序列文件,从而使数据保持二进制形式:
//...
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(MyArrayWritable.class);
job1.setInputFormatClass(WholeFileInputFormat.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job1, new Path(args[2]));
SequenceFileOutputFormat.setOutputPath(job1, TEMP_PATH);
//...
job2.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.addInputPath(job2, TEMP_PATH);
FileOutputFormat.setOutputPath(job2, new Path(args[3]));