为什么 Hadoop Map-Reduce 应用程序在两个不同的 reduce 任务中处理相同的数据?
Why is Hadoop Map-Reduce application processing the same data in two different reduce tasks?
我正在研究 hadoop map-reduce 框架并遵循 Hadoop-权威指南。
如书中所述,我已经实现了一个 Map-reduce 任务,它读取整个输入文件并将输出委托给 SequenceFileOutputFormat。这是我实现的 类:
SmallFilesToSequenceFileConverter.java
public class SmallFilesToSequenceFileConverter extends Configured implements Tool {
static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
private Text filenameKey;
@Override
protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
InputSplit split = context.getInputSplit();
Path path = ((FileSplit)split).getPath();
filenameKey = new Text(path.getName());
}
@Override
protected void map(NullWritable key, BytesWritable value,
Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
context.write(filenameKey, value);
}
}
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(SequenceFileMapper.class);
job.setNumReduceTasks(2);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception{
String argg[] = {"/Users/bng/Documents/hadoop/inputFromBook/smallFiles",
"/Users/bng/Documents/hadoop/output_SmallFilesToSequenceFileConverter"};
int exitcode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), argg);
System.exit(exitcode);
}
}
WholeFileInputFormat.java
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
WholeFileRecordReader.java
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(!processed){
byte[] contents = new byte[(int)fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
}catch(Exception e){
e.printStackTrace();
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
}
}
如 SmallFilesToSequenceFileConverter.java 中所述,当我使用单个 reduce 任务时,一切正常,我得到了预期的输出,如下所示:
//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������xd[^•MÈÔg…h#Ÿa������a���
aaaaaaaaaa������b���
bbbbbbbbbb������c���
cccccccccc������d���
dddddddddd������dummy���ffffffffff
������e����������f���
ffffffffff
但这里的问题是当我使用两个 reduce 任务时,我得到了两个 reduce 任务正在处理的输出结果。如果有两个 reduce 任务,这里是输出。
//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������ÓÙE˜xØÏXØâÆU.êÚ������a���
aaaaaaaaaa������b�
bbbbbbbbbb������c
cccccccccc������e����
//part-r-00001
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������π¸ú∞8Á8˜lÍx∞:¿������b���
bbbbbbbbbb������d���
dddddddddd������dummy���ffffffffff
������f���
ffffffffff
这表明数据 "bbbbbbbbbb" 正在被两个 reduce 任务处理。
这里可能是什么问题?或者有这个结果是罚款吗?或者我犯了什么错误?
供参考,输入目录包含六个输入文件,名称为a至f,每个文件包含与文件名对应的数据,例如名为 a 的文件包含数据 "aaaaaaaaaaa",其他文件包含类似的数据,除了 e 文件是空的。并且有一个名为 dummy 的文件,其中包含数据 "ffffffffff".
我不明白具体的原因。
但是删除 hdfs-site.xml 中指定的名称节点和数据节点目录并重新启动 hdfs、yarn 和 mr 服务解决了我的问题。
我正在研究 hadoop map-reduce 框架并遵循 Hadoop-权威指南。
如书中所述,我已经实现了一个 Map-reduce 任务,它读取整个输入文件并将输出委托给 SequenceFileOutputFormat。这是我实现的 类:
SmallFilesToSequenceFileConverter.java
public class SmallFilesToSequenceFileConverter extends Configured implements Tool {
static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
private Text filenameKey;
@Override
protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
InputSplit split = context.getInputSplit();
Path path = ((FileSplit)split).getPath();
filenameKey = new Text(path.getName());
}
@Override
protected void map(NullWritable key, BytesWritable value,
Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
context.write(filenameKey, value);
}
}
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(SequenceFileMapper.class);
job.setNumReduceTasks(2);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception{
String argg[] = {"/Users/bng/Documents/hadoop/inputFromBook/smallFiles",
"/Users/bng/Documents/hadoop/output_SmallFilesToSequenceFileConverter"};
int exitcode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), argg);
System.exit(exitcode);
}
}
WholeFileInputFormat.java
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
WholeFileRecordReader.java
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(!processed){
byte[] contents = new byte[(int)fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
}catch(Exception e){
e.printStackTrace();
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
}
}
如 SmallFilesToSequenceFileConverter.java 中所述,当我使用单个 reduce 任务时,一切正常,我得到了预期的输出,如下所示:
//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������xd[^•MÈÔg…h#Ÿa������a���
aaaaaaaaaa������b���
bbbbbbbbbb������c���
cccccccccc������d���
dddddddddd������dummy���ffffffffff
������e����������f���
ffffffffff
但这里的问题是当我使用两个 reduce 任务时,我得到了两个 reduce 任务正在处理的输出结果。如果有两个 reduce 任务,这里是输出。
//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������ÓÙE˜xØÏXØâÆU.êÚ������a���
aaaaaaaaaa������b�
bbbbbbbbbb������c
cccccccccc������e����
//part-r-00001
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������π¸ú∞8Á8˜lÍx∞:¿������b���
bbbbbbbbbb������d���
dddddddddd������dummy���ffffffffff
������f���
ffffffffff
这表明数据 "bbbbbbbbbb" 正在被两个 reduce 任务处理。 这里可能是什么问题?或者有这个结果是罚款吗?或者我犯了什么错误?
供参考,输入目录包含六个输入文件,名称为a至f,每个文件包含与文件名对应的数据,例如名为 a 的文件包含数据 "aaaaaaaaaaa",其他文件包含类似的数据,除了 e 文件是空的。并且有一个名为 dummy 的文件,其中包含数据 "ffffffffff".
我不明白具体的原因。
但是删除 hdfs-site.xml 中指定的名称节点和数据节点目录并重新启动 hdfs、yarn 和 mr 服务解决了我的问题。