如何转换为最新的avro数据

How to convert to avro data to date

我有一个如下所示的 avro 模式...

 "namespace": "example.avro",
 "type": "record",
 "name": "UserDate",
 "fields": [
     {"name": "name", "type": "string"},         
     {"name": "date", "type": [{"type":"int","logicalType":"date"}, "null"]},
     {"name": "datenotnullable", "type": {"type":"string","logicalType":"date"}}     
 ]

当我检索 avro 数据时,我计划检测 logicalType,然后如果 logicalType 是“date”则转换为 date

我该如何着手实现这一目标?

    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
    GenericRecord user = null;
    try(DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, 
      datumReader)){
    while (dataFileReader.hasNext()) {

    user = dataFileReader.next(user);
  }
}

上面代码传来的数据还是原来的原始类型

我不确定如何根据定义的逻辑类型获取此数据?

根据阅读,我认为我需要以某种方式实现转换,但不确定如何执行此操作...

有什么帮助吗?

我在这里找到了答案

按照下面修改我的代码,它有效

    final GenericData genericData = new GenericData();
    genericData.addLogicalTypeConversion(new MyTimestampConversion());
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema1, schema1, genericData);
    //DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); //original code
    
    File file2 = new File("long.avro");

    GenericRecord user = null;
    
    try(DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader)){
        while (dataFileReader.hasNext()) {

            user = dataFileReader.next(user);
        }
    }

已添加转化

public static class MyTimestampConversion extends Conversion<String> {
    public MyTimestampConversion() {
    }

    public Class<String> getConvertedType() {
        return String.class;
    }

    public String getLogicalTypeName() {
        return "timestamp-millis";
    }

    public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
        return (new DateTime(millisFromEpoch, DateTimeZone.UTC)).toString();
    }

    public Long toLong(String timestamp, Schema schema, LogicalType type) {
        return new Long(timestamp);
    }

    public String fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
        return (new DateTime(value, DateTimeZone.UTC)).toString();
    }

    public CharSequence toCharSequence(String value, Schema schema, LogicalType type) {
        return value;
    }
}

结果(原始,无效)

{"DateModified": 520171631042}
{"DateModified": 0}

结果(当前,工作)

{"DateModified": "1986-06-26T12:07:11.042Z"}
{"DateModified": "1970-01-01T00:00:00.000Z"}