java.lang.NullPointerException 在 org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.close

java.lang.NullPointerException at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.close

我是 运行 两个 map-reduce 对。第一个 map-reduce 的输出被用作下一个 map-reduce 的输入。为此,我给出了 job.setOutputFormatClass(SequenceFileOutputFormat.class)。而 运行 以下驱动程序 class:

package org;

import org.apache.commons.configuration.ConfigurationFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;

public class Driver1 extends Configured implements Tool
{
 
  public int run(String[] args) throws Exception
  {
   
  if(args.length !=3) {
  System.err.println("Usage: MaxTemperatureDriver <input path> <outputpath>");
  System.exit(-1);
  }
  //ConfFactory WorkFlow=new ConfFactory(new Path("/input.txt"),new Path("/output.txt"),TextInputFormat.class,VarLongWritable.class,Text.class,VarLongWritable.class,VectorWritable.class,SequenceFileOutputFormat.class);
  Job job = new Job();
  Job job1=new Job();
  job.setJarByClass(Driver1.class);
  job.setJobName("Max Temperature");
 
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job,new Path(args[1]));
   
  job.setMapperClass(UserVectorMapper.class);
  job.setReducerClass(UserVectorReducer.class);
   
  job.setOutputKeyClass(VarLongWritable.class);
  job.setOutputValueClass(VectorWritable.class);
  job.setOutputFormatClass(SequenceFileOutputFormat.class);
  
  job1.setJarByClass(Driver1.class);
  //job.setJobName("Max Temperature");
  job1.setInputFormatClass(SequenceFileInputFormat.class);
 
  FileInputFormat.addInputPath(job1, new Path("output/part-r-00000"));
  FileOutputFormat.setOutputPath(job1,new Path(args[2]));
   
  job1.setMapperClass(ItemToItemPrefMapper.class);
  //job1.setReducerClass(UserVectorReducer.class);
   
  job1.setOutputKeyClass(VectorWritable.class);
  job1.setOutputValueClass(VectorWritable.class);
  job1.setOutputFormatClass(SequenceFileOutputFormat.class);
  System.exit(job.waitForCompletion(true) && job1.waitForCompletion(true) ? 0:1);
  boolean success = job.waitForCompletion(true);
  return success ? 0 : 1;
  
  }
  public static void main(String[] args) throws Exception {
  Driver1 driver = new Driver1();
  int exitCode = ToolRunner.run(driver, args);
  System.exit(exitCode);
  }
  }

我收到以下运行时日志。

15/02/24 20:00:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/02/24 20:00:49 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/24 20:00:49 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/02/24 20:00:49 INFO input.FileInputFormat: Total input paths to process : 1
15/02/24 20:00:49 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/24 20:00:49 INFO mapred.JobClient: Running job: job_local1723586736_0001
15/02/24 20:00:49 INFO mapred.LocalJobRunner: Waiting for map tasks
15/02/24 20:00:49 INFO mapred.LocalJobRunner: Starting task: attempt_local1723586736_0001_m_000000_0
15/02/24 20:00:49 INFO util.ProcessTree: setsid exited with exit code 0
15/02/24 20:00:49 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1185f32
15/02/24 20:00:49 INFO mapred.MapTask: Processing split: file:/home/smaiti/workspace/recommendationsy/data.txt:0+1979173
15/02/24 20:00:50 INFO mapred.MapTask: io.sort.mb = 100
15/02/24 20:00:50 INFO mapred.MapTask: data buffer = 79691776/99614720
15/02/24 20:00:50 INFO mapred.MapTask: record buffer = 262144/327680
15/02/24 20:00:50 INFO mapred.JobClient:  map 0% reduce 0%
15/02/24 20:00:50 INFO mapred.MapTask: Starting flush of map output
15/02/24 20:00:51 INFO mapred.MapTask: Finished spill 0
15/02/24 20:00:51 INFO mapred.Task: Task:attempt_local1723586736_0001_m_000000_0 is done. And is in the process of commiting
15/02/24 20:00:51 INFO mapred.LocalJobRunner: 
15/02/24 20:00:51 INFO mapred.Task: Task 'attempt_local1723586736_0001_m_000000_0' done.
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Finishing task: attempt_local1723586736_0001_m_000000_0
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Map task executor complete.
15/02/24 20:00:51 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@9cce9
15/02/24 20:00:51 INFO mapred.LocalJobRunner: 
15/02/24 20:00:51 INFO mapred.Merger: Merging 1 sorted segments
15/02/24 20:00:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 2074779 bytes
15/02/24 20:00:51 INFO mapred.LocalJobRunner: 
15/02/24 20:00:51 INFO mapred.Task: Task:attempt_local1723586736_0001_r_000000_0 is done. And is in the process of commiting
15/02/24 20:00:51 INFO mapred.LocalJobRunner: 
15/02/24 20:00:51 INFO mapred.Task: Task attempt_local1723586736_0001_r_000000_0 is allowed to commit now
15/02/24 20:00:51 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1723586736_0001_r_000000_0' to output
15/02/24 20:00:51 INFO mapred.LocalJobRunner: reduce > reduce
15/02/24 20:00:51 INFO mapred.Task: Task 'attempt_local1723586736_0001_r_000000_0' done.
15/02/24 20:00:51 INFO mapred.JobClient:  map 100% reduce 100%
15/02/24 20:00:51 INFO mapred.JobClient: Job complete: job_local1723586736_0001
15/02/24 20:00:51 INFO mapred.JobClient: Counters: 20
15/02/24 20:00:51 INFO mapred.JobClient:   File Output Format Counters 
15/02/24 20:00:51 INFO mapred.JobClient:     Bytes Written=1012481
15/02/24 20:00:51 INFO mapred.JobClient:   File Input Format Counters 
15/02/24 20:00:51 INFO mapred.JobClient:     Bytes Read=1979173
15/02/24 20:00:51 INFO mapred.JobClient:   FileSystemCounters
15/02/24 20:00:51 INFO mapred.JobClient:     FILE_BYTES_READ=6033479
15/02/24 20:00:51 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=5264031
15/02/24 20:00:51 INFO mapred.JobClient:   Map-Reduce Framework
15/02/24 20:00:51 INFO mapred.JobClient:     Reduce input groups=943
15/02/24 20:00:51 INFO mapred.JobClient:     Map output materialized bytes=2074783
15/02/24 20:00:51 INFO mapred.JobClient:     Combine output records=0
15/02/24 20:00:51 INFO mapred.JobClient:     Map input records=100000
15/02/24 20:00:51 INFO mapred.JobClient:     Reduce shuffle bytes=0
15/02/24 20:00:51 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
15/02/24 20:00:51 INFO mapred.JobClient:     Reduce output records=943
15/02/24 20:00:51 INFO mapred.JobClient:     Spilled Records=200000
15/02/24 20:00:51 INFO mapred.JobClient:     Map output bytes=1874777
15/02/24 20:00:51 INFO mapred.JobClient:     Total committed heap usage (bytes)=415760384
15/02/24 20:00:51 INFO mapred.JobClient:     CPU time spent (ms)=0
15/02/24 20:00:51 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
15/02/24 20:00:51 INFO mapred.JobClient:     SPLIT_RAW_BYTES=118
15/02/24 20:00:51 INFO mapred.JobClient:     Map output records=100000
15/02/24 20:00:51 INFO mapred.JobClient:     Combine input records=0
15/02/24 20:00:51 INFO mapred.JobClient:     Reduce input records=100000
15/02/24 20:00:51 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/24 20:00:51 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/02/24 20:00:51 INFO input.FileInputFormat: Total input paths to process : 1
15/02/24 20:00:51 INFO mapred.JobClient: Running job: job_local735350013_0002
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Waiting for map tasks
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Starting task: attempt_local735350013_0002_m_000000_0
15/02/24 20:00:51 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1a970
15/02/24 20:00:51 INFO mapred.MapTask: Processing split: file:/home/smaiti/workspace/recommendationsy/output/part-r-00000:0+1004621
15/02/24 20:00:51 INFO mapred.MapTask: io.sort.mb = 100
15/02/24 20:00:51 INFO mapred.MapTask: data buffer = 79691776/99614720
15/02/24 20:00:51 INFO mapred.MapTask: record buffer = 262144/327680
15/02/24 20:00:51 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader@9cc591
java.lang.NullPointerException
 at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.close(SequenceFileRecordReader.java:101)
 at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.close(MapTask.java:496)
 at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1776)
 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:778)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
 at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Map task executor complete.
15/02/24 20:00:51 WARN mapred.LocalJobRunner: job_local735350013_0002
java.lang.Exception: java.lang.ClassCastException: class org.apache.mahout.math.VectorWritable
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.ClassCastException: class org.apache.mahout.math.VectorWritable
 at java.lang.Class.asSubclass(Class.java:3208)
 at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964)
 at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673)
 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
 at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
15/02/24 20:00:52 INFO mapred.JobClient:  map 0% reduce 0%
15/02/24 20:00:52 INFO mapred.JobClient: Job complete: job_local735350013_0002
15/02/24 20:00:52 INFO mapred.JobClient: Counters: 0

我得到的第一个异常是: java.lang.NullPointerException 在 org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.close(SequenceFileRecordReader.java:101)

请帮忙。

这主要是因为Hadoop在Serializing数据的时候比较混乱

确保

  1. 您应该为两个减速器设置输入和输出文件格式 class。
  2. 检查第二个 class 的输入格式是否是第一个 class 的输出格式。
  3. 中间文件格式可能与 reducer 期望读取的格式不同。
  4. 在整个程序中保持一致的文件格式。