IOException:映射中的键类型不匹配:文本,收到 LongWritable

IOException: Type mismatch in key from map: Text, received LongWritable

我了解到这个话题过去已经讨论过。但不幸的是我没能解决这个问题。我不断收到相同的 IOException 错误。我是 Java 和 Hadoop 的新手,这是我第一次尝试 WordCount 练习。对于任何语法错误或格式问题,我深表歉意。请让我知道我哪里出错了。

Error: java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable

这是我的代码:

MyDriver

package p1;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyDriver {
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        // Create a configuration class pointing to default configuration
        Configuration conf = new Configuration();

        // Create an object of Job by specifying conf object
        Job job = Job.getInstance(conf, "MyWordCountJob");

        // Link your Driver Class with the Job
        job.setJarByClass(MyDriver.class);

        // Link your Mapper Class with the Job
        job.setJarByClass(MyMapper.class);

        // Link your Reducer Class with the Job
        job.setJarByClass(MyReducer.class);

        // set final output Key
        job.setOutputKeyClass(Text.class);

        // set final output Value
        job.setOutputValueClass(IntWritable.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // Defining input Paths
        Path input_dir = new Path("hdfs://localhost:9000/input_data/");
        FileInputFormat.addInputPath(job, input_dir);

        // Defining Output Paths
        Path output_dir = new Path("hdfs://localhost:9000/output_data/");
        FileOutputFormat.setOutputPath(job, output_dir);

        // This piece of code will actually initiate the Job run
        // boolean flag = job.waitForCompletion(true);
        System.out.println("is this running");

        // system.exit means it will kill the jvm and terminate the program.
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        System.out.println("job.waitForCompletion(true)"
                + job.waitForCompletion(true));

    }
}

package p1;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable offset, Text line, Context context)
            throws IOException, InterruptedException {

        String currentline = line.toString();
        System.out.println("MyMapper.map():Offset" + offset
                + " :: CurrentLine=" + currentline);

        // apple apple ball--this is read when the mapper reads the data for the
        // first time/ mapper reads line by line.
        System.out.println(currentline);
        String words[] = currentline.split(" ");

        for (String word : words) {
            System.out.println(" words " + word);
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

MyReducer

package p1;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    //apple [1,1,1]

    @Override
    protected void reduce(Text word, Iterable<IntWritable> value, Context ctx)
            throws IOException, InterruptedException {

        System.out.println("Key=" + word);

        Iterator<IntWritable> it = value.iterator();

        int count = 0;
        while (it.hasNext()) {
            IntWritable i = (IntWritable) it.next();
            count = count + i.get();
        }

        ctx.write(word, new IntWritable(count));
    }

}

问题是您的 Map 键的类型检查问题(在几乎所有实际情况下,您都不需要在 map 阶段修改键类型)。试试这个:

public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

    @Override
    protected void map(Object offset, Text line, Context context)
        throws IOException, InterruptedException {

使用

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

而不是

job.setJarByClass(MyMapper.class);
job.setJarByClass(MyReducer.class);