将参数传递给 mapreduce hadoop 中的记录 reader
passing arguments to record reader in mapreduce hadoop
这是我使用各种参数的代码
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.poi.hwpf.HWPFDocument;
import org.apache.poi.hwpf.extractor.WordExtractor;
public class Docsparser {
private static String Delimiter;
public static class DocsInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new DocsLineRecordReader();
}
}
public static class DocsLineRecordReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();
private int currentword = 0;
private String fileline;
private File file = null;
private String line;
private HWPFDocument document;
private WordExtractor extractor = null;
private String[] filedata;
StringBuilder sb = new StringBuilder();
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) split;
final Path file = fileSplit.getPath();
Configuration conf = context.getConfiguration();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream filein = fs.open(fileSplit.getPath());
String Delim = conf.get("Delim");
if (filein != null)
{
HWPFDocument document = new HWPFDocument(filein);
extractor = new WordExtractor(document);
fileline = extractor.getText();
filedata = fileline.split(Delim);
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
if(currentword < filedata.length)
{
for ( currentword=0;currentword < filedata.length; currentword++)
{
sb.append(filedata[currentword] +",");
line = sb.toString();
}
key.set(line);
value.set("");
return true;
}
else
{
key = null;
value = null;
return false;
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (100.0f / filedata.length * currentword) / 100.0f;
}
@Override
public void close() throws IOException {
}
}
public static class Map extends Mapper<Text, Text, Text, Text>{
public void map(Text key, Text value, Context context) throws IOException, InterruptedException
{
context.write(key,value);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "Docsparser");
job.setJarByClass(Docsparser.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
Delimiter = args[2].toString();
conf.set("Delim",Delimiter);
job.setInputFormatClass(DocsInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
异常详情:
15/09/28 03:50:04 INFO mapreduce.Job: Task Id :
attempt_1443193152998_2319_m_000000_2, Status : FAILED Error: java.lang.NullPointerException
at java.lang.String.split(String.java:2272)
at java.lang.String.split(String.java:2355)
at com.nielsen.grfe.Docsparser$DocsLineRecordReader.initialize(Docsparser.java:66)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:548)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:786)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
NullPointerException
出现在fileline
字符串的split
方法中。我怀疑您没有设置 "Delim"
配置值,因此,您的变量 Delim
是 null
.
所有配置变量都必须在初始化 Job class 之前设置。移动
Delimiter = args[2].toString();
conf.set("Delim",Delimiter);
之前
Job job = new Job(conf, "Docsparser");
这是我使用各种参数的代码
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.poi.hwpf.HWPFDocument;
import org.apache.poi.hwpf.extractor.WordExtractor;
public class Docsparser {
private static String Delimiter;
public static class DocsInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new DocsLineRecordReader();
}
}
public static class DocsLineRecordReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();
private int currentword = 0;
private String fileline;
private File file = null;
private String line;
private HWPFDocument document;
private WordExtractor extractor = null;
private String[] filedata;
StringBuilder sb = new StringBuilder();
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) split;
final Path file = fileSplit.getPath();
Configuration conf = context.getConfiguration();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream filein = fs.open(fileSplit.getPath());
String Delim = conf.get("Delim");
if (filein != null)
{
HWPFDocument document = new HWPFDocument(filein);
extractor = new WordExtractor(document);
fileline = extractor.getText();
filedata = fileline.split(Delim);
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
if(currentword < filedata.length)
{
for ( currentword=0;currentword < filedata.length; currentword++)
{
sb.append(filedata[currentword] +",");
line = sb.toString();
}
key.set(line);
value.set("");
return true;
}
else
{
key = null;
value = null;
return false;
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (100.0f / filedata.length * currentword) / 100.0f;
}
@Override
public void close() throws IOException {
}
}
public static class Map extends Mapper<Text, Text, Text, Text>{
public void map(Text key, Text value, Context context) throws IOException, InterruptedException
{
context.write(key,value);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "Docsparser");
job.setJarByClass(Docsparser.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
Delimiter = args[2].toString();
conf.set("Delim",Delimiter);
job.setInputFormatClass(DocsInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
异常详情:
15/09/28 03:50:04 INFO mapreduce.Job: Task Id : attempt_1443193152998_2319_m_000000_2, Status : FAILED Error: java.lang.NullPointerException at java.lang.String.split(String.java:2272) at java.lang.String.split(String.java:2355) at com.nielsen.grfe.Docsparser$DocsLineRecordReader.initialize(Docsparser.java:66) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:548) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:786) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
NullPointerException
出现在fileline
字符串的split
方法中。我怀疑您没有设置 "Delim"
配置值,因此,您的变量 Delim
是 null
.
所有配置变量都必须在初始化 Job class 之前设置。移动
Delimiter = args[2].toString();
conf.set("Delim",Delimiter);
之前
Job job = new Job(conf, "Docsparser");