MapReduce Avro 输出正在创建文本文件
MapReduce Avro Output is Creating Text File Instead
我有一个读取 avro 数据然后应该输出 avro 数据的 MapReduce 作业。但是,当我在作业成功时检查输出文件时,它们没有 .avro 扩展名,我可以使用简单的文本编辑器查看它们。
我的 Driver 配置为输出 avro,所以我不确定问题出在哪里,我们将不胜感激。
这是我的 Driver class:
public class Driver extends Configured implements Tool{
public static void main(String[] args) throws Exception {
int res =
ToolRunner.run(new Configuration(), new Driver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJarByClass(Driver.class);
job.setJobName("nearestpatient");
AvroJob.setOutputKeySchema(job, Pair.getPairSchema(Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.STRING)));
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(PatientMapper.class);
job.setReducerClass(PatientReducer.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, PatientAvro.getClassSchema());
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
}
这是我的 Reducer class:
public class PatientReducer extends Reducer<LongWritable, LongWritable, AvroWrapper<Pair<Long, String>>, NullWritable> {
@Override
public void reduce(LongWritable providerKey, Iterable<LongWritable> patients, Context context) throws IOException, InterruptedException {
String outputList = "[";
`enter code here` List<Long> patientList = new ArrayList<>();
for (LongWritable patientKey : patients) {
outputList += new LongWritable(patientKey.get()) + ", ";
}
outputList = outputList.substring(0, outputList.length() - 2);
outputList += "]";
context.write(new AvroWrapper<Pair<Long, String>>(new Pair<Long, String> (providerKey.get(), outputList)), NullWritable.get());
}
}
在您的 运行() 方法中,您需要添加以下内容
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
在您的代码中替换行
FileOutputFormat.setOutputPath(job, new Path(args[1]));
和
job.setOutputFormatClass(AvroKeyOutputFormat.class);
AvroKeyOutputFormat.setOutputPath(job, new Path(args[1]));
我有一个读取 avro 数据然后应该输出 avro 数据的 MapReduce 作业。但是,当我在作业成功时检查输出文件时,它们没有 .avro 扩展名,我可以使用简单的文本编辑器查看它们。
我的 Driver 配置为输出 avro,所以我不确定问题出在哪里,我们将不胜感激。
这是我的 Driver class:
public class Driver extends Configured implements Tool{
public static void main(String[] args) throws Exception {
int res =
ToolRunner.run(new Configuration(), new Driver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJarByClass(Driver.class);
job.setJobName("nearestpatient");
AvroJob.setOutputKeySchema(job, Pair.getPairSchema(Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.STRING)));
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(PatientMapper.class);
job.setReducerClass(PatientReducer.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, PatientAvro.getClassSchema());
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
}
这是我的 Reducer class:
public class PatientReducer extends Reducer<LongWritable, LongWritable, AvroWrapper<Pair<Long, String>>, NullWritable> {
@Override
public void reduce(LongWritable providerKey, Iterable<LongWritable> patients, Context context) throws IOException, InterruptedException {
String outputList = "[";
`enter code here` List<Long> patientList = new ArrayList<>();
for (LongWritable patientKey : patients) {
outputList += new LongWritable(patientKey.get()) + ", ";
}
outputList = outputList.substring(0, outputList.length() - 2);
outputList += "]";
context.write(new AvroWrapper<Pair<Long, String>>(new Pair<Long, String> (providerKey.get(), outputList)), NullWritable.get());
}
}
在您的 运行() 方法中,您需要添加以下内容
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
在您的代码中替换行
FileOutputFormat.setOutputPath(job, new Path(args[1]));
和
job.setOutputFormatClass(AvroKeyOutputFormat.class);
AvroKeyOutputFormat.setOutputPath(job, new Path(args[1]));