Hadoop:带有自定义输入格式的 NullPointerException
Hadoop: NullPointerException with Custom InputFormat
我已经为 Hadoop 开发了一个自定义 InputFormat
(包括一个自定义 InputSplit
和一个自定义 RecordReader
),我正在经历一个罕见的 NullPointerException
。
这些 类 将用于查询公开 REST API 用于记录检索的第三方系统。因此,我在 DBInputFormat
中得到了灵感,它也是一个非 HDFS InputFormat
。
我得到的错误如下:
Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:762)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
我搜索了 MapTask
(Hadoop 2.1.0 版本)的代码,发现有问题的部分是 RecordReader
:
的初始化
472 NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
473 org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
474 TaskReporter reporter,
475 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
476 throws InterruptedException, IOException {
...
491 this.real = inputFormat.createRecordReader(split, taskContext);
...
494 }
...
519 @Override
520 public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
521 org.apache.hadoop.mapreduce.TaskAttemptContext context
522 ) throws IOException, InterruptedException {
523 long bytesInPrev = getInputBytes(fsStats);
524 real.initialize(split, context);
525 long bytesInCurr = getInputBytes(fsStats);
526 fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
527 }
当然是我代码的相关部分:
# MyInputFormat.java
public static void setEnvironmnet(Job job, String host, String port, boolean ssl, String APIKey) {
backend = new Backend(host, port, ssl, APIKey);
}
public static void addResId(Job job, String resId) {
Configuration conf = job.getConfiguration();
String inputs = conf.get(INPUT_RES_IDS, "");
if (inputs.isEmpty()) {
inputs += restId;
} else {
inputs += "," + resId;
}
conf.set(INPUT_RES_IDS, inputs);
}
@Override
public List<InputSplit> getSplits(JobContext job) {
// resulting splits container
List<InputSplit> splits = new ArrayList<InputSplit>();
// get the Job configuration
Configuration conf = job.getConfiguration();
// get the inputs, i.e. the list of resource IDs
String input = conf.get(INPUT_RES_IDS, "");
String[] resIDs = StringUtils.split(input);
// iterate on the resIDs
for (String resID: resIDs) {
splits.addAll(getSplitsResId(resID, job.getConfiguration()));
}
// return the splits
return splits;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
if (backend == null) {
logger.info("Unable to create a MyRecordReader, it seems the environment was not properly set");
return null;
}
// create a record reader
return new MyRecordReader(backend, split, context);
}
# MyRecordReader.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// get start, end and current positions
MyInputSplit inputSplit = (MyInputSplit) this.split;
start = inputSplit.getFirstRecordIndex();
end = start + inputSplit.getLength();
current = 0;
// query the third-party system for the related resource, seeking to the start of the split
records = backend.getRecords(inputSplit.getResId(), start, end);
}
# MapReduceTest.java
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MapReduceTest(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "MapReduce test");
job.setJarByClass(MapReduceTest.class);
job.setMapperClass(MyMap.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(MyInputFormat.class);
MyInputFormat.addInput(job, "ca73a799-9c71-4618-806e-7bd0ca1911f4");
InputFormat.setEnvironmnet(job, "my.host.com", "443", true, "my_api_key");
FileOutputFormat.setOutputPath(job, new Path(args[0]));
return job.waitForCompletion(true) ? 0 : 1;
}
有什么问题吗?
顺便说一句,"good" InputSplit
RecordReader
必须使用哪个,给构造函数的那个还是initialize
方法中给的那个?不管怎样,我已经尝试了这两个选项,结果错误是一样的:)
我读取你的 strack trace real
的方式在第 524 行为空。
但是不要相信我的话。在那里输入 assert
或 system.out.println
并自己检查 real
的值。
NullPointerException
几乎总是意味着你点掉了一些你不希望为空的东西。一些图书馆和馆藏会把它扔给你作为他们说 "this can't be null".
的方式
Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
对我来说,这读作:在 org.apache.hadoop.mapred
包中,MapTask
class 有一个内部 class NewTrackingRecordReader
和 initialize
在第 524 行抛出 NullPointerException
的方法。
524 real.initialize( blah, blah) // I actually stopped reading after the dot
this.real
在第 491 行设置。
491 this.real = inputFormat.createRecordReader(split, taskContext);
假设您没有遗漏范围更小的 real
来掩盖 this.real
那么我们需要查看 inputFormat.createRecordReader(split, taskContext);
如果可以 return null
那么它可能是罪魁祸首。
当 backend
为 null 时,它会 return null
。
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split,
TaskAttemptContext context) {
if (backend == null) {
logger.info("Unable to create a MyRecordReader, " +
"it seems the environment was not properly set");
return null;
}
// create a record reader
return new MyRecordReader(backend, split, context);
}
看起来 setEnvironmnet
应该设置 backend
# MyInputFormat.java
public static void setEnvironmnet(
Job job,
String host,
String port,
boolean ssl,
String APIKey) {
backend = new Backend(host, port, ssl, APIKey);
}
backend
必须在 setEnvironment
之外的某处声明(否则您会遇到编译器错误)。
如果 backend
在构造时没有被设置为非空的东西并且 setEnvironmnet
在 createRecordReader
之前没有被调用那么你应该期望得到 NullPointerException
你明白了。
更新:
如您所述,由于 setEnvironmnet()
是静态的 backend
也必须是静态的。这意味着您必须确保其他实例没有将其设置为空。
已解决。问题是 backend
变量被声明为 static
,即它 属于 到 java class,因此任何其他对象改变该变量(例如 null
)会影响同一 class.
的所有其他对象
现在,setEnvironment
添加主机、端口、ssl 使用和 API 键作为配置(与 setResId
已经对资源 ID 所做的相同);当调用 createRecordReader
时,会获取此配置并创建 backend
对象。
感谢 CandiedOrange 让我走上了正确的道路!
我已经为 Hadoop 开发了一个自定义 InputFormat
(包括一个自定义 InputSplit
和一个自定义 RecordReader
),我正在经历一个罕见的 NullPointerException
。
这些 类 将用于查询公开 REST API 用于记录检索的第三方系统。因此,我在 DBInputFormat
中得到了灵感,它也是一个非 HDFS InputFormat
。
我得到的错误如下:
Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:762)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
我搜索了 MapTask
(Hadoop 2.1.0 版本)的代码,发现有问题的部分是 RecordReader
:
472 NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
473 org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
474 TaskReporter reporter,
475 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
476 throws InterruptedException, IOException {
...
491 this.real = inputFormat.createRecordReader(split, taskContext);
...
494 }
...
519 @Override
520 public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
521 org.apache.hadoop.mapreduce.TaskAttemptContext context
522 ) throws IOException, InterruptedException {
523 long bytesInPrev = getInputBytes(fsStats);
524 real.initialize(split, context);
525 long bytesInCurr = getInputBytes(fsStats);
526 fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
527 }
当然是我代码的相关部分:
# MyInputFormat.java
public static void setEnvironmnet(Job job, String host, String port, boolean ssl, String APIKey) {
backend = new Backend(host, port, ssl, APIKey);
}
public static void addResId(Job job, String resId) {
Configuration conf = job.getConfiguration();
String inputs = conf.get(INPUT_RES_IDS, "");
if (inputs.isEmpty()) {
inputs += restId;
} else {
inputs += "," + resId;
}
conf.set(INPUT_RES_IDS, inputs);
}
@Override
public List<InputSplit> getSplits(JobContext job) {
// resulting splits container
List<InputSplit> splits = new ArrayList<InputSplit>();
// get the Job configuration
Configuration conf = job.getConfiguration();
// get the inputs, i.e. the list of resource IDs
String input = conf.get(INPUT_RES_IDS, "");
String[] resIDs = StringUtils.split(input);
// iterate on the resIDs
for (String resID: resIDs) {
splits.addAll(getSplitsResId(resID, job.getConfiguration()));
}
// return the splits
return splits;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
if (backend == null) {
logger.info("Unable to create a MyRecordReader, it seems the environment was not properly set");
return null;
}
// create a record reader
return new MyRecordReader(backend, split, context);
}
# MyRecordReader.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// get start, end and current positions
MyInputSplit inputSplit = (MyInputSplit) this.split;
start = inputSplit.getFirstRecordIndex();
end = start + inputSplit.getLength();
current = 0;
// query the third-party system for the related resource, seeking to the start of the split
records = backend.getRecords(inputSplit.getResId(), start, end);
}
# MapReduceTest.java
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MapReduceTest(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "MapReduce test");
job.setJarByClass(MapReduceTest.class);
job.setMapperClass(MyMap.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(MyInputFormat.class);
MyInputFormat.addInput(job, "ca73a799-9c71-4618-806e-7bd0ca1911f4");
InputFormat.setEnvironmnet(job, "my.host.com", "443", true, "my_api_key");
FileOutputFormat.setOutputPath(job, new Path(args[0]));
return job.waitForCompletion(true) ? 0 : 1;
}
有什么问题吗?
顺便说一句,"good" InputSplit
RecordReader
必须使用哪个,给构造函数的那个还是initialize
方法中给的那个?不管怎样,我已经尝试了这两个选项,结果错误是一样的:)
我读取你的 strack trace real
的方式在第 524 行为空。
但是不要相信我的话。在那里输入 assert
或 system.out.println
并自己检查 real
的值。
NullPointerException
几乎总是意味着你点掉了一些你不希望为空的东西。一些图书馆和馆藏会把它扔给你作为他们说 "this can't be null".
Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
对我来说,这读作:在 org.apache.hadoop.mapred
包中,MapTask
class 有一个内部 class NewTrackingRecordReader
和 initialize
在第 524 行抛出 NullPointerException
的方法。
524 real.initialize( blah, blah) // I actually stopped reading after the dot
this.real
在第 491 行设置。
491 this.real = inputFormat.createRecordReader(split, taskContext);
假设您没有遗漏范围更小的 real
来掩盖 this.real
那么我们需要查看 inputFormat.createRecordReader(split, taskContext);
如果可以 return null
那么它可能是罪魁祸首。
当 backend
为 null 时,它会 return null
。
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split,
TaskAttemptContext context) {
if (backend == null) {
logger.info("Unable to create a MyRecordReader, " +
"it seems the environment was not properly set");
return null;
}
// create a record reader
return new MyRecordReader(backend, split, context);
}
看起来 setEnvironmnet
应该设置 backend
# MyInputFormat.java
public static void setEnvironmnet(
Job job,
String host,
String port,
boolean ssl,
String APIKey) {
backend = new Backend(host, port, ssl, APIKey);
}
backend
必须在 setEnvironment
之外的某处声明(否则您会遇到编译器错误)。
如果 backend
在构造时没有被设置为非空的东西并且 setEnvironmnet
在 createRecordReader
之前没有被调用那么你应该期望得到 NullPointerException
你明白了。
更新:
如您所述,由于 setEnvironmnet()
是静态的 backend
也必须是静态的。这意味着您必须确保其他实例没有将其设置为空。
已解决。问题是 backend
变量被声明为 static
,即它 属于 到 java class,因此任何其他对象改变该变量(例如 null
)会影响同一 class.
现在,setEnvironment
添加主机、端口、ssl 使用和 API 键作为配置(与 setResId
已经对资源 ID 所做的相同);当调用 createRecordReader
时,会获取此配置并创建 backend
对象。
感谢 CandiedOrange 让我走上了正确的道路!