将参数传递给 mapreduce hadoop 中的记录 reader

passing arguments to record reader in mapreduce hadoop

这是我使用各种参数的代码

import java.io.File;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.poi.hwpf.HWPFDocument;
import org.apache.poi.hwpf.extractor.WordExtractor;



public class Docsparser {
      private static String Delimiter;

    public static class DocsInputFormat extends FileInputFormat<Text, Text> {

          @Override
          public RecordReader<Text, Text> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException, InterruptedException {
              return new DocsLineRecordReader();
          }
    }

          public static  class DocsLineRecordReader extends RecordReader<Text, Text> {

              private Text key = new Text();
              private Text value = new Text();
              private int currentword = 0;
              private String fileline;
              private File file = null;
              private  String line; 
              private HWPFDocument document;
              private WordExtractor extractor = null;
              private String[] filedata;
              StringBuilder sb = new StringBuilder();

              @Override
              public void initialize(InputSplit split, TaskAttemptContext context)
                      throws IOException, InterruptedException {

                  FileSplit fileSplit = (FileSplit) split;
                  final Path file = fileSplit.getPath();
                  Configuration conf = context.getConfiguration();
                    FileSystem fs = file.getFileSystem(conf);
                    FSDataInputStream filein = fs.open(fileSplit.getPath());

                    String Delim = conf.get("Delim");
                      if (filein != null)
                      {
                          HWPFDocument document = new HWPFDocument(filein);
                          extractor = new WordExtractor(document);
                          fileline = extractor.getText();

                          filedata = fileline.split(Delim);
                      }
                    }


              @Override
              public boolean nextKeyValue() throws IOException, InterruptedException
              {

                  if (key == null) {
                      key = new Text();
                  }

                  if (value == null) {
                      value = new Text();
                  } 
                  if(currentword < filedata.length)
                  {
                      for ( currentword=0;currentword < filedata.length; currentword++)
                      {               
                          sb.append(filedata[currentword] +",");
                          line = sb.toString();               

                      }

                      key.set(line);
                      value.set("");
                      return true;
                  }
                  else
                  {
                      key = null;
                      value = null;
                        return false;
                  }

              }

              @Override
              public Text getCurrentKey() throws IOException, InterruptedException {
                  return key;
              }

              @Override
              public Text getCurrentValue() throws IOException, InterruptedException {
                  return value;
              }

              @Override
              public float getProgress() throws IOException, InterruptedException {
                  return (100.0f / filedata.length * currentword) / 100.0f;
              }

             @Override
              public void close() throws IOException {

                }
             }


    public static class Map extends Mapper<Text, Text, Text, Text>{

        public void map(Text key, Text value, Context context) throws IOException, InterruptedException
        {

                    context.write(key,value);

         }
    }

        public static void main(String[] args) throws Exception
        {

                Configuration conf = new Configuration();
                Job job = new Job(conf, "Docsparser");
                job.setJarByClass(Docsparser.class);


                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);

                job.setMapperClass(Map.class);
                job.setNumReduceTasks(0);

                FileInputFormat.setInputPaths(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                Delimiter = args[2].toString();
                conf.set("Delim",Delimiter);


                job.setInputFormatClass(DocsInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);

                System.exit(job.waitForCompletion(true) ? 0 : 1);

        }

}

异常详情:

15/09/28 03:50:04 INFO mapreduce.Job: Task Id : attempt_1443193152998_2319_m_000000_2, Status : FAILED Error: java.lang.NullPointerException at java.lang.String.split(String.java:2272) at java.lang.String.split(String.java:2355) at com.nielsen.grfe.Docsparser$DocsLineRecordReader.initialize(Docsparser.java:66) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:548) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:786) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

NullPointerException出现在fileline字符串的split方法中。我怀疑您没有设置 "Delim" 配置值,因此,您的变量 Delimnull.

所有配置变量都必须在初始化 Job class 之前设置。移动

    Delimiter = args[2].toString(); 
    conf.set("Delim",Delimiter);

之前

Job job = new Job(conf, "Docsparser");