Mapper 和 Reducer 之间的类型不匹配

Type mismatch between Mapper and Reducer

我有以下输出模式

public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>

public static class JoinSumReducer extends Reducer<Text, RecordWritable, Text, DoubleWritable>

我收到以下运行时异常 java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable(代码后的完整堆栈跟踪)。

我尝试了 Type mismatch in value from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text 提出的解决方案,但这会导致运行时异常:java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable(代码后的完整堆栈跟踪)。

很明显,某处存在类型不匹配,但我遵循了所有值定义,但找不到我遗漏的内容。还有其他地方我需要定义正在使用的类型吗?

这是我的代码

可写枚举class

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;

/**
 * Writable for enum Record
 */
public class RecordWritable implements Writable{
    public static enum Record {BUY, CLICK};
    private Record data;

    public void set(Record data) {
        this.data = data;
    }

    public Record get() {
        return this.data;
    }

    public void readFields(DataInput dataInput) throws IOException {
        data = WritableUtils.readEnum(dataInput, Record.class); 
    }

    public void write(DataOutput dataOutput) throws IOException {
        WritableUtils.writeEnum(dataOutput,data);
    }
}

The Mapper/Reducer 和 Main

import java.io.IOException;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class SuccessRate {

    /**
     * Mapper
     * - Key = ItemID
     * - Value = The type of record is determined by number of columns
     */
    public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>{

        private Text itemID = new Text();
        private RecordWritable record = new RecordWritable();
        Pattern itemIDpattern = Pattern.compile("^(\d+),");
        Pattern columnPattern = Pattern.compile(",");

        public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
            Scanner itr = new Scanner(value.toString());
            while (itr.hasNextLine()) {
                String line = itr.nextLine();

                String id = null;
                Matcher m = itemIDpattern.matcher(line);
                if(m.find())
                    id = m.group(1);

                RecordWritable.Record fileType;
                int count = StringUtils.countMatches(line, ",");
                if(count==4)
                    fileType = RecordWritable.Record.CLICK;
                else
                    fileType = RecordWritable.Record.BUY;

                if(id != null) {
                    itemID.set(id);
                    record.set(fileType);
                    context.write(itemID, record);
                }
            }
            itr.close();
        }
    }

    /**
     * Reducer
     * - Key : ItemID
     * - Value : sum of buys / sum of clicks
     */
    public static class JoinSumReducer
    extends Reducer<Text, RecordWritable, Text, DoubleWritable> {
        private DoubleWritable result = new DoubleWritable();

        public void reduce(Text key, Iterable<RecordWritable> values,
                Context context
                ) throws IOException, InterruptedException {
            int sumClick = 0;
            int sumBuy = 0;
            for (RecordWritable val : values) {
                switch(val.get()) {
                case CLICK:
                    sumClick += 1;
                    break;
                case BUY:
                    sumBuy += 1;
                    break;
                }
            }
            result.set((double)sumBuy/(double)sumClick);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {       
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "success rate");
        job.setJarByClass(SuccessRate.class);
        job.setMapperClass(RecordMapper.class);
        job.setCombinerClass(JoinSumReducer.class);
        job.setReducerClass(JoinSumReducer.class);
//      job.setMapOutputKeyClass(Text.class); // I tried adding these two lines after reading 
//      job.setMapOutputValueClass(RecordWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

完整的异常堆栈跟踪

原始错误

java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1093)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:727)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
    at SuccessRate$RecordMapper.map(SuccessRate.java:54)
    at SuccessRate$RecordMapper.map(SuccessRate.java:26)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我尝试了 Type mismatch in value from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text 提出的解决方案,但这会导致运行时异常:

2018-09-24 11:36:04,423 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@5532c2f8
java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable
    at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
    at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1562)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1879)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
    at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:86)
    at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:66)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1900)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1662)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1505)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:735)
    at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2076)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:809)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

首先,您应该在主方法体中取消注释这两行。因为:

Calling job.setOutputKeyClass( NullWritable.class ); will set the types expected as output from both the map and reduce phases.

If your Mapper emits different types than the Reducer, you can set the types emitted by the mapper with the JobConf's setMapOutputKeyClass() and setMapOutputValueClass() methods. These implicitly set the input types expected by the Reducer.

其次,问题是你确实关心你的组合器输入和输出。

Output types of a combiner must match output types of a mapper. Hadoop makes no guarantees on how many times the combiner is applied, or that it is even applied at all. And that's what happens in your case.

更多信息推荐关注: and

MR中的工作流程是这样的

mapper(inputkey 1,inputvalue1 ,outputkey1,outputvalue1) => combiner(outputkey1,outputvalue1 ,outputkey2,outputvalue2) => reducer (outputkey2,outputvalue2 ,outputkey3,outputvalue3)

每个键和值数据类型必须匹配

你的错误显示Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable

是因为你设置了Combiner job.setCombinerClass(JoinSumReducer.class)

组合器输出与您的减速器不匹配
您可以删除此行并重试。