运行 MapReduce 程序时出现 ClassNotFound 异常

ClassNotFound Exception while running MapReduce program

我正在编写用于矩阵加法的 mapreduce 程序。因为它需要 2 个输入文件,所以我使用了 MultipleInputs。我有这些 classes

MatAddMapper1.java

package mapred;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MatAddMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> 
{
    //private static final int MISSING = 9999;
    @Override
    public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException
    {
        String line = value.toString();
        String[] content = line.split (" ");
        String key1 = content[0] + " " + content[1];
        int val = Integer.parseInt(content[2]);
        // Key is (i,j)
        context.write(new Text(key1), new IntWritable(val));
    }
}

MatAddMapper2.java类似

MatAddReducer.java

package mapred;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MatAddReducer
extends Reducer<Text, IntWritable, Text, IntWritable> 
{
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException 
    {
        int val = 0;
        for (IntWritable value : values) 
        {
            val = val + value.get();
        }
        context.write(key, new IntWritable(val));
    }
}

MatAddApp.java(主要class)

package mapred;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MatAddApp extends Configured implements Tool
{


     public int run(String[] args) throws Exception 
     {
         Configuration conf = new Configuration();
         @SuppressWarnings("deprecation")
         Job job = new Job(conf, "Matrix Addition");
         job.setJarByClass(MatAddApp.class);
         MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,MatAddMapper1.class);
         MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,MatAddMapper2.class);

         FileOutputFormat.setOutputPath(job, new Path(args[2]));
         job.setReducerClass(MatAddReducer.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);

         return (job.waitForCompletion(true) ? 0 : 1);

     }

     public static void main(String[] args) throws Exception 
     {
         int ecode = ToolRunner.run(new MatAddApp(), args);
         System.exit(ecode);
     }

}

我正在使用 eclipse 并创建了一个 jar 文件 MatAddition.jar。 M.txt 和 N.txt 是输入矩阵。当我尝试 运行 我的 hadoop 集群中的程序时,出现以下错误

Exception in thread "main" java.lang.ClassNotFoundException: MatAddApp
    at java.net.URLClassLoader.run(URLClassLoader.java:366)
    at java.net.URLClassLoader.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:214)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

问题是因为 classname.The 在配置中设置时驱动程序类名应该是完全限定的,如下所示:

 job.setJarByClass(mapred.MatAddApp.class); 

Input.txt

A,0|0,1.0
A,0|1,2.0
A,0|2,3.0
A,0|3,4.0
A,1|0,5.0
A,1|1,6.0
A,1|2,7.0
A,1|3,8.0
B,0|0,1.0
B,0|1,2.0
B,0|2,3.0
B,0|3,4.0
B,1|0,5.0
B,1|1,6.0
B,1|2,7.0
B,1|3,8.0

这里,第一列代表矩阵的名称,第二列代表索引,第三列代表值。

MatrixAdd.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import java.util.ArrayList;
import java.util.Iterator;
import java.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MatrixAdd {
    public static class MatMapper extends Mapper<Object, Text, Text, DoubleWritable>{
        private Text index = new Text();
        private final static DoubleWritable num = new DoubleWritable();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{      
            String record = value.toString();
            String[] parts = record.split(",");
            index.set(parts[1]);
            num.set(Double.parseDouble(parts[2]));
            context.write(index, num);
        }
    }
    public static class MatReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable> {
        private DoubleWritable result = new DoubleWritable();
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double sumValue = 0;
            for(DoubleWritable val: values) {
                sumValue += val.get();
            }
            result.set(sumValue);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "max temp");
        job.setJarByClass(MatrixAdd.class);
        job.setMapperClass(MatMapper.class);
        job.setCombinerClass(MatReducer.class);
        job.setReducerClass(MatReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }    
}

输出:

0|0 2.0
0|1 4.0
0|2 6.0
0|3 8.0
1|0 10.0
1|1 12.0
1|2 14.0
1|3 16.0