Class 在 EMR 上找不到 Main$MapClass

Class Main$MapClass not found on EMR

我正在尝试 运行 我在本地计算机上检查 EMR(亚马逊)上的 map-reduce 作业,但出现此错误:

Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class Main$MapClass not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)
at org.apache.hadoop.mapreduce.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:733)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: java.lang.ClassNotFoundException: Class Main$MapClass not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1626)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1718)
... 8 more

这是定义作业配置的主要功能:

    public static void main(String[] args) throws Exception {
    String inputLocation;
    String outputLocation;
    String includeStopWords;

    if (args.length > 2) {
        inputLocation = args[0];
        outputLocation = args[1];
        includeStopWords = args[2];
    }else{
        for(int i=0; i < args.length; i++){
            System.out.println("Missing Args!!\n" + "Number of args: "+ args.length+ "\n args[" + i+ "]:" + args[i]);
        }
        throw new IllegalArgumentException();
    }


    // first job - count the 2 grams words by decade
    Configuration conf = new Configuration();
    conf.set("includeStopWords", includeStopWords);
    @SuppressWarnings("deprecation")
    Job job = new Job(conf, "words count");
    System.out.println("before set classes:");  
    job.setJarByClass(Main.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(ReduceClass.class);
    System.out.println("after setting classes.");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
//  job.setInputFormatClass(SequenceFileInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(inputLocation));
    FileOutputFormat.setOutputPath(job, new Path(outputLocation));
    System.out.println("before wait to complition");
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    System.out.println("after wait to completion");


}

运行er 在 EMR 上的代码是:

public class Runner {
public static Logger logger = LogManager.getRootLogger();

public static void main(String[] args) throws IOException {

    String minPmi;
    String relMinPmi;
    String language;
    String includeStopWords;
    if(args.length > 3){
        minPmi = args[0];
        relMinPmi = args[1];
        language = args[2];
        includeStopWords = args[3]; 
    }else{
        System.out.println("Missing Arguments!");
        throw new IllegalArgumentException();
    }


    //Jobs output locations
    String firstOutput = "s3n://dsp152ass2/outputs/first";
    String secondOutput = "s3n://dsp152ass2/outputs/second";
    String thirdOutput = "s3n://dsp152ass2/outputs/third";

    //Jobs jar location
    String firstJobJar = "s3n://dsp152ass2/jars/firstJob.jar";
    String secondJobJar = "s3n://dsp152ass2/jars/secondJob.jar";
    String thirdJobJar = "s3n://dsp152ass2/jars/thirdJob.jar";

    //select input corpus by language argument
    String corpus = "s3n://dsp152/output/eng-us-all-100k-2gram"; //TODO: change to the real input
    if(language.equalsIgnoreCase("heb")){
        corpus = "s3n://dsp152/output/heb-all-100k-2gram";
    }



    //Create EMR 
    AWSCredentials credentials = new PropertiesCredentials(new FileInputStream(new File("credentials.properties")));
    AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials);

    //Define hadoop Steps config
    HadoopJarStepConfig firstJobConfing = new HadoopJarStepConfig()
    .withJar(firstJobJar)  
    //.withMainClass("FirstMR.Main") // sec only runner
    .withArgs(corpus, firstOutput , includeStopWords);

    HadoopJarStepConfig secondJobConfing = new HadoopJarStepConfig()
    .withJar(secondJobJar)
//  .withMainClass("Main")
    .withArgs(firstOutput +"/part-r-00000" , secondOutput);

    HadoopJarStepConfig thirdJobConfing = new HadoopJarStepConfig()
    .withJar(thirdJobJar) 
    //.withMainClass("Main")
    .withArgs(secondOutput+"/part-r-00000", thirdOutput , minPmi, relMinPmi); 


    //Define step config
    StepConfig firstJobStep = new StepConfig()
    .withName("firstJobStep")
    .withHadoopJarStep(firstJobConfing)
    .withActionOnFailure("TERMINATE_JOB_FLOW");

    StepConfig secondJobStep = new StepConfig()
    .withName("secondJobStep")
    .withHadoopJarStep(secondJobConfing)
    .withActionOnFailure("TERMINATE_JOB_FLOW");

    StepConfig thirdJobStep = new StepConfig()
    .withName("thirdJobStep")
    .withHadoopJarStep(thirdJobConfing)
    .withActionOnFailure("TERMINATE_JOB_FLOW");


    //Define job flow
    JobFlowInstancesConfig instances = new JobFlowInstancesConfig()
    .withInstanceCount(1) //TODO: change to 2 - 10
    .withMasterInstanceType(InstanceType.M1Large.toString())
    .withSlaveInstanceType(InstanceType.M1Large.toString())
    .withHadoopVersion("2.2.0").withEc2KeyName("dsp152ass2")
    .withKeepJobFlowAliveWhenNoSteps(false)
    .withPlacement(new PlacementType("us-east-1b"));



    //Define run flow
    RunJobFlowRequest runFlowRequest = new RunJobFlowRequest()
    .withName("DSPextractCollections")  
    .withInstances(instances)
    .withJobFlowRole("EMR_EC2_DefaultRole")
    .withServiceRole("EMR_DefaultRole")
    .withSteps(firstJobStep,secondJobStep,thirdJobStep)
    .withLogUri("s3n://dsp152ass2/logs/"); 


    //Run the jobs
    RunJobFlowResult runJobFlowResult = mapReduce.runJobFlow(runFlowRequest);
    String jobFlowId = runJobFlowResult.getJobFlowId();
    System.out.println("### WORKFLOW SUCCESSFULLY ADDED: \n" + "\t" + jobFlowId);

}

}

我的项目结构:

在此先感谢您的帮助。

这些是可以解决分辨率问题的步骤:

  1. 创建单独的 MapperReducer 类。
  2. 为 类 创建一个包名称并使用它而不是默认包(foo.Mainfoo.MapClass ...)。
  3. 当您在 Eclipse 中时,请尝试使用 "Extract required libraries into generated JAR" 选项,而不是 "Package required libraries into generated JAR"。这可能会解决 类 未找到的问题(确保您复制了 eclipse 生成的所有内容)。

提示: EMR 具有不同于 运行 本地(可能是伪分布式)部署的特定设置。按照 AWS's guide.

确保这些是正确的