将 WholeFileInputFormat 与 Hadoop MapReduce 结合使用仍然会导致 Mapper 一次处理 1 行
Using WholeFileInputFormat with Hadoop MapReduce still results in Mapper processing 1 line at a time
为了扩展我的 header 使用 Hadoop 2.6.. 并且需要将整个文件发送到我的映射器而不是一次发送一行。我按照权威指南中的 Tom Whites 代码创建了 WholeFileInputFormat 和 WholeFileRecordReader,但我的 Mapper 仍然一次处理 1 行文件。谁能看到我的代码中缺少什么?我完全按照我所看到的使用了书中的例子。任何指导将不胜感激。
WholeFileInputFormat.java
public class WholeFileInputFormat extends FileInputFormat <NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path file){
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
WholeFileRecordReader.java
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException{
if (!processed){
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException{
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException{
return value;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException{
//do nothing :)
}
}
以及我的 Mapreduce 的主要方法
public class ECCCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Usage: ProcessLogs <input dir> <output dir>\n");
System.exit(-1);
}
//@SuppressWarnings("deprecation")
Job job = new Job();
job.setJarByClass(ECCCount.class);
job.setJobName("ECCCount");
//FileInputFormat.setInputPaths(job, new Path(args[0]));
WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(ECCCountMapper.class);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
还有我的 Mapper。现在它只是 returns 作为测试用例给出的值,以查看它返回的是一行还是整个文件
public class ECCCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(new Text(value), new IntWritable(1));
}
}
问题可能是映射器的输入格式。你有 LongWritable 和文本。但是在提到的示例中,他们使用了 NullWritable、BytesWritable,因为这是 WholeFileInputFormat 所具有的。此外,您需要在 Job class(main 方法)中提供 job.setInputFormatClass(WholeFileInputFormat.class);。希望对您有所帮助,祝您编码愉快
感谢 Ramzy 的输入,我发现了我的错误,并且能够通过以下更改使整个文件通过
在我的主要方法中,我需要指定我需要使用的 InputFormatClass。
job.setInputFormatClass(WholeFileInputFormat.class)
并且我的 Mapper 需要期望正确的类型作为输入
public class ECCCountMapper extends Mapper<NullWritable, BytesWritable, Text, IntWritable>{
这两个更改成功地将整个文件的一个字节[]发送到我的映射器,我在其中根据需要对其进行操作。
为了扩展我的 header 使用 Hadoop 2.6.. 并且需要将整个文件发送到我的映射器而不是一次发送一行。我按照权威指南中的 Tom Whites 代码创建了 WholeFileInputFormat 和 WholeFileRecordReader,但我的 Mapper 仍然一次处理 1 行文件。谁能看到我的代码中缺少什么?我完全按照我所看到的使用了书中的例子。任何指导将不胜感激。
WholeFileInputFormat.java
public class WholeFileInputFormat extends FileInputFormat <NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path file){
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
WholeFileRecordReader.java
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException{
if (!processed){
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException{
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException{
return value;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException{
//do nothing :)
}
}
以及我的 Mapreduce 的主要方法
public class ECCCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Usage: ProcessLogs <input dir> <output dir>\n");
System.exit(-1);
}
//@SuppressWarnings("deprecation")
Job job = new Job();
job.setJarByClass(ECCCount.class);
job.setJobName("ECCCount");
//FileInputFormat.setInputPaths(job, new Path(args[0]));
WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(ECCCountMapper.class);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
还有我的 Mapper。现在它只是 returns 作为测试用例给出的值,以查看它返回的是一行还是整个文件
public class ECCCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(new Text(value), new IntWritable(1));
}
}
问题可能是映射器的输入格式。你有 LongWritable 和文本。但是在提到的示例中,他们使用了 NullWritable、BytesWritable,因为这是 WholeFileInputFormat 所具有的。此外,您需要在 Job class(main 方法)中提供 job.setInputFormatClass(WholeFileInputFormat.class);。希望对您有所帮助,祝您编码愉快
感谢 Ramzy 的输入,我发现了我的错误,并且能够通过以下更改使整个文件通过
在我的主要方法中,我需要指定我需要使用的 InputFormatClass。
job.setInputFormatClass(WholeFileInputFormat.class)
并且我的 Mapper 需要期望正确的类型作为输入
public class ECCCountMapper extends Mapper<NullWritable, BytesWritable, Text, IntWritable>{
这两个更改成功地将整个文件的一个字节[]发送到我的映射器,我在其中根据需要对其进行操作。