减少前映射后的 IOException

IOException after mapping before reducing

我有一个数据集,其中每条记录由 2 个字段组成:

我想计算每个域的平均生命周期天数。 IE。如果我有这样的 2 条记录:

hadoop.apache.org/docs/current 22118400
hadoop.apache.org/docs/current/api/org/ 27820800

我应该收到答案:

hadoop.apache.org 289

为了这些计算,我写了一个 hadoop-job:

package ru.bdata.siteslifes;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import ru.bdata.siteslifes.arrays.IntArrayWritable;

import java.io.IOException;
import java.util.Iterator;

    public static class DomainMapper extends Mapper<Text, IntWritable, Text, IntWritable> {

        @Override
        public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException{
            String url = key.toString();
            context.write(new Text(url.substring(0, url.indexOf('/'))), value);
        }
    }

    public static class AvgCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
        private static final int SEC_IN_DAY = 86400;

        @Override
        public void reduce(Text key, Iterable<IntWritable> value, Context context)
                throws IOException,InterruptedException{
            float sum = 0;
            int cnt = 0;
            Iterator<IntWritable> it = value.iterator();
            while (it.hasNext()){
                sum += it.next().get();
                cnt++;
            }
            context.write(key, new IntWritable(Math.round(sum / (cnt * SEC_IN_DAY))));
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = new Job(conf);
        job.setJarByClass(AvgSiteLifeCounter.class);

        job.setMapperClass(DomainMapper.class);
        job.setCombinerClass(AvgCombiner.class);
        job.setReducerClass(Reducer.class);

        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setNumReduceTasks(8);

        SequenceFileInputFormat.addInputPath(job, new Path(strings[0]));
        SequenceFileOutputFormat.setOutputPath(job, new Path(strings[1]));

        return job.waitForCompletion(true)? 0: 1;
    }
}

当我在集群上执行程序时,mapping-part 运行良好,但在 reducer-part 启动之前,我看到一个异常:

java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1305)
    at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.write(SequenceFileOutputFormat.java:74)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:551)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:99)
    at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:144)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
    at org.apache.hadoop.mapred.Child.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.ja...

我应该如何更改我的代码以使 reduce-part 也能正常工作?

由于此作业只是 hadoop-task 的一部分,输入和输出数据表示为二进制文件 (SequenceFile)。

P.S。如您所见,我不使用 LongWritable。只有 IntWritable。但是在异常日志中我看到 LongWritable

map-stage Hadoop 将结果写入临时文件后,reducer 读取此数据。 run()方法中没有数据的键值设置,必须从临时文件中读取。 因此,在为映射器的结果设置键和值后,我的代码就可以工作了。 IE。我在 运行() 方法中添加了这样的字符串:

conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class); 

当驱动程序代码设置的一个作业已知类型与运行时实际接收到的类型之间确实存在类型不匹配时,Hadoop 给出 IO 异常有点令人困惑。