带有 Avro 的 Mapreduce - 通用解析
Mapreduce with Avro - Generic parsing
问题陈述:
- hdfs 中可用的 avro 格式数据。
- 上述 avro 数据的架构也可用。
- 需要在 map reduce 中解析此 Avro 数据并生成具有相同模式的输出 avro 数据(需要清理传入的 Avro 数据)。
- 传入的 avro 数据可以是任何模式。
因此,要求是编写一个通用的 map reduce,它可以获取任何 Avro 数据,但以 Avro 格式生成与传入模式相同的输出。
代码(经过多次尝试,这是我达到的程度)
Driver
public class AvroDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJarByClass(AvroMapper.class);
job.setJobName("Avro With Xml Mapper");
job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
//This is required to use avro-1.7.6 and above
job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setMapperClass(AvroMapper.class);
Schema schema = new Schema.Parser().parse(new File(args[2]));
AvroJob.setInputKeySchema(job, schema);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
AvroJob.setOutputKeySchema(job, schema);
job.setNumReduceTasks(0);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new AvroDriver(), args);
System.exit(res);
}
}
映射器
public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
List<Field> fields = key.datum().getSchema().getFields();
GenericRecord record = new GenericData.Record(key.datum().getSchema());
for(Field f : fields) {
System.out.println("Field Name - " + f.name());
record.put(f.name(), key.datum().get(f.name()));
}
System.out.println("Record - " + record);
GenericData d = new GenericData();
d.newRecord(record, key.datum().getSchema());
AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
System.out.println("Generic Record (Avro Key) - " + outkey);
context.write(outkey, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
throw new IOException(e.getMessage());
}
}
}
命令
hadoop jar $jar_name $input_avro_data_path $output_path $path_to_the_input_avro_schema
Avro 模式示例
{ "type" : "record", "name" : "Entity", "namespace" : "com.sample.avro", "fields".......
当我 运行 缩小地图时遇到的问题
Error running child : java.lang.NullPointerException: in
com.sample.avro.Entity null of com.sample.avro.Entity
org.apache.avro.file.DataFileWriter$AppendWriteException:
java.lang.NullPointerException: in com.sample.avro.Entity null of
com.sample.avro.Entity
环境
HDP 2.3 沙箱
有什么想法吗?
已更新
我尝试了以下但结果相同
public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
List<Field> fields = key.datum().getSchema().getFields();
Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
List<Field> outFields = new ArrayList<Field>();
for(Field f : fields) {
System.out.println("Field Name - " + f.name());
Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
outFields.add(f1);
}
s.setFields(outFields);
System.out.println("Out Schema - " + s);
GenericRecord record = new GenericData.Record(s);
for(Field f : fields) {
record.put(f.name(), key.datum().get(f.name()));
}
System.out.println("Record - " + record);
GenericData d = new GenericData();
d.newRecord(record, s);
AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
System.out.println("Generic Record (Avro Key) - " + outkey.datum());
context.write(outkey, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
请注意,avro 输入到 map reduce 工作正常,但以 Avro 格式输出是这里的问题。
最后,我找到了答案和映射器代码如下。
我没有使用 GenericData 发出 AvroKey,而是改为发出 GenericData.Record.
public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData.Record>, NullWritable> {
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
List<Field> fields = key.datum().getSchema().getFields();
Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
List<Field> outFields = new ArrayList<Field>();
for(Field f : fields) {
System.out.println("Field Name - " + f.name());
Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
outFields.add(f1);
}
s.setFields(outFields);
System.out.println("Out Schema - " + s);
GenericData.Record record = new GenericData.Record(s);
for(Field f : fields) {
record.put(f.name(), key.datum().get(f.name()));
}
System.out.println("Record - " + record);
AvroKey<GenericData.Record> outkey = new AvroKey<GenericData.Record>(record);
System.out.println("Generic Record (Avro Key) - " + outkey.datum());
context.write(outkey, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
System.out.println(e);
System.out.println(e.getMessage());
throw new IOException(e.getMessage());
}
}
}
问题陈述:
- hdfs 中可用的 avro 格式数据。
- 上述 avro 数据的架构也可用。
- 需要在 map reduce 中解析此 Avro 数据并生成具有相同模式的输出 avro 数据(需要清理传入的 Avro 数据)。
- 传入的 avro 数据可以是任何模式。
因此,要求是编写一个通用的 map reduce,它可以获取任何 Avro 数据,但以 Avro 格式生成与传入模式相同的输出。
代码(经过多次尝试,这是我达到的程度)
Driver
public class AvroDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJarByClass(AvroMapper.class);
job.setJobName("Avro With Xml Mapper");
job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
//This is required to use avro-1.7.6 and above
job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setMapperClass(AvroMapper.class);
Schema schema = new Schema.Parser().parse(new File(args[2]));
AvroJob.setInputKeySchema(job, schema);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
AvroJob.setOutputKeySchema(job, schema);
job.setNumReduceTasks(0);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new AvroDriver(), args);
System.exit(res);
}
}
映射器
public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
List<Field> fields = key.datum().getSchema().getFields();
GenericRecord record = new GenericData.Record(key.datum().getSchema());
for(Field f : fields) {
System.out.println("Field Name - " + f.name());
record.put(f.name(), key.datum().get(f.name()));
}
System.out.println("Record - " + record);
GenericData d = new GenericData();
d.newRecord(record, key.datum().getSchema());
AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
System.out.println("Generic Record (Avro Key) - " + outkey);
context.write(outkey, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
throw new IOException(e.getMessage());
}
}
}
命令
hadoop jar $jar_name $input_avro_data_path $output_path $path_to_the_input_avro_schema
Avro 模式示例
{ "type" : "record", "name" : "Entity", "namespace" : "com.sample.avro", "fields".......
当我 运行 缩小地图时遇到的问题
Error running child : java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity
org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity
环境
HDP 2.3 沙箱
有什么想法吗?
已更新
我尝试了以下但结果相同
public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
List<Field> fields = key.datum().getSchema().getFields();
Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
List<Field> outFields = new ArrayList<Field>();
for(Field f : fields) {
System.out.println("Field Name - " + f.name());
Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
outFields.add(f1);
}
s.setFields(outFields);
System.out.println("Out Schema - " + s);
GenericRecord record = new GenericData.Record(s);
for(Field f : fields) {
record.put(f.name(), key.datum().get(f.name()));
}
System.out.println("Record - " + record);
GenericData d = new GenericData();
d.newRecord(record, s);
AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
System.out.println("Generic Record (Avro Key) - " + outkey.datum());
context.write(outkey, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
请注意,avro 输入到 map reduce 工作正常,但以 Avro 格式输出是这里的问题。
最后,我找到了答案和映射器代码如下。 我没有使用 GenericData 发出 AvroKey,而是改为发出 GenericData.Record.
public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData.Record>, NullWritable> {
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
List<Field> fields = key.datum().getSchema().getFields();
Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
List<Field> outFields = new ArrayList<Field>();
for(Field f : fields) {
System.out.println("Field Name - " + f.name());
Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
outFields.add(f1);
}
s.setFields(outFields);
System.out.println("Out Schema - " + s);
GenericData.Record record = new GenericData.Record(s);
for(Field f : fields) {
record.put(f.name(), key.datum().get(f.name()));
}
System.out.println("Record - " + record);
AvroKey<GenericData.Record> outkey = new AvroKey<GenericData.Record>(record);
System.out.println("Generic Record (Avro Key) - " + outkey.datum());
context.write(outkey, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
System.out.println(e);
System.out.println(e.getMessage());
throw new IOException(e.getMessage());
}
}
}