Hadoop - 如何从 mapred.JobConf 中提取 taskId?
Hadoop - How to extract a taskId from mapred.JobConf?
是否可以从 *mapred*.JobConf
创建一个有效的 *mapreduce*.TaskAttemptID
?
背景
我需要为 ExistingFileInputFormat
写一个 FileInputFormatAdapter
。问题是适配器需要扩展 mapred.InputFormat
而现有格式扩展 mapreduce.InputFormat
.
我需要构建一个 mapreduce.TaskAttemptContextImpl
,以便我可以实例化 ExistingRecordReader
。但是,我无法创建有效的 TaskId
...taskId 结果为空。
那么如何从 mapred.JobConf
.
中获取 taskId、jobId 等
特别是在适配器的 getRecordReader
中,我需要做类似的事情:
public org.apache.hadoop.mapred.RecordReader<NullWritable, MyWritable> getRecordReader(
org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter) throws IOException {
SplitAdapter splitAdapter = (SplitAdapter) split;
final Configuration conf = job;
/*************************************************/
//The problem is here, "mapred.task.id" is not in the conf
/*************************************************/
final TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
final TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
try {
return new RecordReaderAdapter(new ExistingRecordReader(
splitAdapter.getMapRedeuceSplit(),
context));
} catch (InterruptedException e) {
throw new RuntimeException("Failed to create record-reader.", e);
}
}
此代码抛出异常:
Caused by: java.lang.NullPointerException
at org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.<init>(TaskAttemptContextImpl.java:44)
at org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.<init>(TaskAttemptContextImpl.java:39)
'super(conf, taskId.getJobID());' 抛出异常,很可能是因为 taskId 为空。
我通过浏览 HiveHbaseTableInputFormat
找到了答案。由于我的解决方案是针对配置单元的,因此非常有效。
TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
job.getConfiguration(), reporter);
是否可以从 *mapred*.JobConf
创建一个有效的 *mapreduce*.TaskAttemptID
?
背景
我需要为 ExistingFileInputFormat
写一个 FileInputFormatAdapter
。问题是适配器需要扩展 mapred.InputFormat
而现有格式扩展 mapreduce.InputFormat
.
我需要构建一个 mapreduce.TaskAttemptContextImpl
,以便我可以实例化 ExistingRecordReader
。但是,我无法创建有效的 TaskId
...taskId 结果为空。
那么如何从 mapred.JobConf
.
特别是在适配器的 getRecordReader
中,我需要做类似的事情:
public org.apache.hadoop.mapred.RecordReader<NullWritable, MyWritable> getRecordReader(
org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter) throws IOException {
SplitAdapter splitAdapter = (SplitAdapter) split;
final Configuration conf = job;
/*************************************************/
//The problem is here, "mapred.task.id" is not in the conf
/*************************************************/
final TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
final TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
try {
return new RecordReaderAdapter(new ExistingRecordReader(
splitAdapter.getMapRedeuceSplit(),
context));
} catch (InterruptedException e) {
throw new RuntimeException("Failed to create record-reader.", e);
}
}
此代码抛出异常:
Caused by: java.lang.NullPointerException
at org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.<init>(TaskAttemptContextImpl.java:44)
at org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.<init>(TaskAttemptContextImpl.java:39)
'super(conf, taskId.getJobID());' 抛出异常,很可能是因为 taskId 为空。
我通过浏览 HiveHbaseTableInputFormat
找到了答案。由于我的解决方案是针对配置单元的,因此非常有效。
TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
job.getConfiguration(), reporter);