带有 Avro 的 Mapreduce - 通用解析

Mapreduce with Avro - Generic parsing

问题陈述:

  1. hdfs 中可用的 avro 格式数据。
  2. 上述 avro 数据的架构也可用。
  3. 需要在 map reduce 中解析此 Avro 数据并生成具有相同模式的输出 avro 数据(需要清理传入的 Avro 数据)。
  4. 传入的 avro 数据可以是任何模式。

因此,要求是编写一个通用的 map reduce,它可以获取任何 Avro 数据,但以 Avro 格式生成与传入模式相同的输出。

代码(经过多次尝试,这是我达到的程度)

Driver

public class AvroDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        job.setJarByClass(AvroMapper.class);
        job.setJobName("Avro With Xml Mapper");
        job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);

        //This is required to use avro-1.7.6 and above
        job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setInputFormatClass(AvroKeyInputFormat.class);
        job.setMapperClass(AvroMapper.class);
        Schema schema = new Schema.Parser().parse(new File(args[2]));
        AvroJob.setInputKeySchema(job, schema);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        job.setMapOutputKeyClass(AvroKey.class);
        AvroJob.setOutputKeySchema(job, schema);
        job.setNumReduceTasks(0);
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new AvroDriver(), args);
        System.exit(res);
    }
}

映射器

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {

        @Override
        public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {

            try {
                System.out.println("Specific Record - " + key);
                System.out.println("Datum :: " + key.datum());
                System.out.println("Schema :: " + key.datum().getSchema());
                List<Field> fields = key.datum().getSchema().getFields();


                GenericRecord record = new GenericData.Record(key.datum().getSchema());
                for(Field f : fields) {
                    System.out.println("Field Name - " + f.name());
                    record.put(f.name(), key.datum().get(f.name()));
                }
                System.out.println("Record - " + record);
                GenericData d = new GenericData();
                d.newRecord(record, key.datum().getSchema());
                AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);

                System.out.println("Generic Record (Avro Key) - " + outkey);
                context.write(outkey, NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
                throw new IOException(e.getMessage());
            }
        }
    }

命令

hadoop jar $jar_name $input_avro_data_path $output_path $path_to_the_input_avro_schema

Avro 模式示例

{ "type" : "record", "name" : "Entity", "namespace" : "com.sample.avro", "fields".......

当我 运行 缩小地图时遇到的问题

Error running child : java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity

org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity

环境

HDP 2.3 沙箱

有什么想法吗?

已更新

我尝试了以下但结果相同

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {

        @Override
        public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {

            try {
                System.out.println("Specific Record - " + key);
                System.out.println("Datum :: " + key.datum());
                System.out.println("Schema :: " + key.datum().getSchema());
                List<Field> fields = key.datum().getSchema().getFields();

                Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
                List<Field> outFields  = new ArrayList<Field>();
                for(Field f : fields) {
                    System.out.println("Field Name - " + f.name());
                    Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
                    outFields.add(f1);
                }
                s.setFields(outFields);

                System.out.println("Out Schema - " + s);
                GenericRecord record = new GenericData.Record(s);
                for(Field f : fields) {
                    record.put(f.name(), key.datum().get(f.name()));
                }
                System.out.println("Record - " + record);
                GenericData d = new GenericData();
                d.newRecord(record, s);
                AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
                System.out.println("Generic Record (Avro Key) - " + outkey.datum());
                context.write(outkey, NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

请注意,avro 输入到 map reduce 工作正常,但以 Avro 格式输出是这里的问题。

最后,我找到了答案和映射器代码如下。 我没有使用 GenericData 发出 AvroKey,而是改为发出 GenericData.Record.

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData.Record>, NullWritable> {

        @Override
        public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {

            try {
                System.out.println("Specific Record - " + key);
                System.out.println("Datum :: " + key.datum());
                System.out.println("Schema :: " + key.datum().getSchema());
                List<Field> fields = key.datum().getSchema().getFields();

                Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
                List<Field> outFields  = new ArrayList<Field>();
                for(Field f : fields) {
                    System.out.println("Field Name - " + f.name());
                    Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
                    outFields.add(f1);
                }
                s.setFields(outFields);

                System.out.println("Out Schema - " + s);
                GenericData.Record record = new GenericData.Record(s);
                for(Field f : fields) {
                    record.put(f.name(), key.datum().get(f.name()));
                }
                System.out.println("Record - " + record);
                AvroKey<GenericData.Record> outkey = new AvroKey<GenericData.Record>(record);
                System.out.println("Generic Record (Avro Key) - " + outkey.datum());
                context.write(outkey, NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(e);
                System.out.println(e.getMessage());
                throw new IOException(e.getMessage());
            }
        }
    }