为什么我在 hadoop 的 mapreduce 中得到 3xx 重复项?
Why do i get 3xx duplicates in hadoop's mapreduce?
我正在使用 hadoop 的 mapreduce 从 hdfs 中读取文件,将其放入一个简单的解析器,然后将该解析器的输出写回到 hdfs。我还没有减少任务。我想知道为什么我的输出文件中有大约 300 个重复项。
这是我的地图方法。
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
FileSplit fsplit = (FileSplit) reporter.getInputSplit();
Main parser = new Main();
String datFilePath = fsplit.getPath().getName();
String valueMap = "/path/to/file";
Path pt = fsplit.getPath();
FileSystem fs = null;
try {
fs = FileSystem.get(new URI("hdfs://xxx.xxx.x.x:xxxx"),
new Configuration());
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try (FSDataInputStream inputStream = fs.open(pt)) {
ReadableByteChannel channel = Channels.newChannel(inputStream);
ByteBuffer buffer = ByteBuffer.allocate((int) fs.getFileStatus(pt).getLen());
channel.read(buffer);
buffer.order(ByteOrder.LITTLE_ENDIAN);
SimpleKeyValueStructure map = parser.parse(datFilePath, buffer,
valueMap);
String lrtransPath = map.getInputIdentifier();
SortedMap<String, Object> data = map.getData();
for (Entry<String, Object> entry : data.entrySet()) {
term.set(entry.getKey());
pathToFile.set(entry.getValue().toString());
output.collect(term, pathToFile);
}
count += 1;
System.out.println(count);
}
}
}
我最后打印出来了,确实是3xx。这是配置问题吗?我的工作配置:
JobConf conf = new JobConf(MapReduce.class);
conf.setJobName("jobxyz");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setNumReduceTasks(0);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
输出完全正确,但重复了。
A Mapper
为文件的每个输入拆分调用; Mapper
的 map()
会为每条记录调用。由于您的代码对于每个输入拆分中的每条记录都是 运行,因此您会得到重复项。
我正在使用 hadoop 的 mapreduce 从 hdfs 中读取文件,将其放入一个简单的解析器,然后将该解析器的输出写回到 hdfs。我还没有减少任务。我想知道为什么我的输出文件中有大约 300 个重复项。
这是我的地图方法。
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
FileSplit fsplit = (FileSplit) reporter.getInputSplit();
Main parser = new Main();
String datFilePath = fsplit.getPath().getName();
String valueMap = "/path/to/file";
Path pt = fsplit.getPath();
FileSystem fs = null;
try {
fs = FileSystem.get(new URI("hdfs://xxx.xxx.x.x:xxxx"),
new Configuration());
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try (FSDataInputStream inputStream = fs.open(pt)) {
ReadableByteChannel channel = Channels.newChannel(inputStream);
ByteBuffer buffer = ByteBuffer.allocate((int) fs.getFileStatus(pt).getLen());
channel.read(buffer);
buffer.order(ByteOrder.LITTLE_ENDIAN);
SimpleKeyValueStructure map = parser.parse(datFilePath, buffer,
valueMap);
String lrtransPath = map.getInputIdentifier();
SortedMap<String, Object> data = map.getData();
for (Entry<String, Object> entry : data.entrySet()) {
term.set(entry.getKey());
pathToFile.set(entry.getValue().toString());
output.collect(term, pathToFile);
}
count += 1;
System.out.println(count);
}
}
}
我最后打印出来了,确实是3xx。这是配置问题吗?我的工作配置:
JobConf conf = new JobConf(MapReduce.class);
conf.setJobName("jobxyz");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setNumReduceTasks(0);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
输出完全正确,但重复了。
A Mapper
为文件的每个输入拆分调用; Mapper
的 map()
会为每条记录调用。由于您的代码对于每个输入拆分中的每条记录都是 运行,因此您会得到重复项。