在 MapReduce 中写入多个 O/P 文件时出现问题
Issue while Writing Multiple O/P Files in MapReduce
我需要根据过滤条件将我的输入文件拆分为 2 个输出文件。我的输出目录应该如下所示:
/hdfs/base/dir/matched/YYYY/MM/DD
/hdfs/base/dir/notmatched/YYYY/MM/DD
我正在使用 MultipleOutputs
class 在我的地图函数中拆分我的数据。
在我的驱动程序 class 中,我使用如下:
FileOutputFormat.setOutputPath(job, new Path("/hdfs/base/dir"));
我在下面使用的 Mapper 中:
mos.write(key, value, fileName); // File Name is generating based on filter criteria
这个程序一天都运行良好。但是第二天我的程序失败了:
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://nameservice1/hdfs/base/dir already exists
第二天我不能使用不同的基本目录。
我该如何处理这种情况?
注意:我不想两次读取输入以创建 2 个单独的文件。
您可以在输出值中有一个标志列。稍后您可以处理输出并将其按标志列拆分。
创建自定义 o/p 格式 class 如下所示
package com.visa.util;
import java.io.IOException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class CostomOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>{
@Override
public void checkOutputSpecs(JobContext arg0) throws IOException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException {
return super.getOutputCommitter(arg0);
}
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException {
return super.getRecordWriter(arg0);
}
}
并在驱动程序中使用它 class:
job.setOutputFormatClass(CostomOutputFormat.class);
这将跳过检查 o/p 目录是否存在。
我需要根据过滤条件将我的输入文件拆分为 2 个输出文件。我的输出目录应该如下所示:
/hdfs/base/dir/matched/YYYY/MM/DD
/hdfs/base/dir/notmatched/YYYY/MM/DD
我正在使用 MultipleOutputs
class 在我的地图函数中拆分我的数据。
在我的驱动程序 class 中,我使用如下:
FileOutputFormat.setOutputPath(job, new Path("/hdfs/base/dir"));
我在下面使用的 Mapper 中:
mos.write(key, value, fileName); // File Name is generating based on filter criteria
这个程序一天都运行良好。但是第二天我的程序失败了:
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://nameservice1/hdfs/base/dir already exists
第二天我不能使用不同的基本目录。
我该如何处理这种情况?
注意:我不想两次读取输入以创建 2 个单独的文件。
您可以在输出值中有一个标志列。稍后您可以处理输出并将其按标志列拆分。
创建自定义 o/p 格式 class 如下所示
package com.visa.util;
import java.io.IOException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class CostomOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>{
@Override
public void checkOutputSpecs(JobContext arg0) throws IOException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException {
return super.getOutputCommitter(arg0);
}
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException {
return super.getRecordWriter(arg0);
}
}
并在驱动程序中使用它 class:
job.setOutputFormatClass(CostomOutputFormat.class);
这将跳过检查 o/p 目录是否存在。