如何转换为最新的avro数据
How to convert to avro data to date
我有一个如下所示的 avro 模式...
"namespace": "example.avro",
"type": "record",
"name": "UserDate",
"fields": [
{"name": "name", "type": "string"},
{"name": "date", "type": [{"type":"int","logicalType":"date"}, "null"]},
{"name": "datenotnullable", "type": {"type":"string","logicalType":"date"}}
]
当我检索 avro 数据时,我计划检测 logicalType,然后如果 logicalType 是“date”则转换为 date
我该如何着手实现这一目标?
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
GenericRecord user = null;
try(DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file,
datumReader)){
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
}
}
上面代码传来的数据还是原来的原始类型
我不确定如何根据定义的逻辑类型获取此数据?
根据阅读,我认为我需要以某种方式实现转换,但不确定如何执行此操作...
有什么帮助吗?
我在这里找到了答案
按照下面修改我的代码,它有效
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyTimestampConversion());
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema1, schema1, genericData);
//DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); //original code
File file2 = new File("long.avro");
GenericRecord user = null;
try(DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader)){
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
}
}
已添加转化
public static class MyTimestampConversion extends Conversion<String> {
public MyTimestampConversion() {
}
public Class<String> getConvertedType() {
return String.class;
}
public String getLogicalTypeName() {
return "timestamp-millis";
}
public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
return (new DateTime(millisFromEpoch, DateTimeZone.UTC)).toString();
}
public Long toLong(String timestamp, Schema schema, LogicalType type) {
return new Long(timestamp);
}
public String fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
return (new DateTime(value, DateTimeZone.UTC)).toString();
}
public CharSequence toCharSequence(String value, Schema schema, LogicalType type) {
return value;
}
}
结果(原始,无效)
{"DateModified": 520171631042}
{"DateModified": 0}
结果(当前,工作)
{"DateModified": "1986-06-26T12:07:11.042Z"}
{"DateModified": "1970-01-01T00:00:00.000Z"}
我有一个如下所示的 avro 模式...
"namespace": "example.avro",
"type": "record",
"name": "UserDate",
"fields": [
{"name": "name", "type": "string"},
{"name": "date", "type": [{"type":"int","logicalType":"date"}, "null"]},
{"name": "datenotnullable", "type": {"type":"string","logicalType":"date"}}
]
当我检索 avro 数据时,我计划检测 logicalType,然后如果 logicalType 是“date”则转换为 date
我该如何着手实现这一目标?
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
GenericRecord user = null;
try(DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file,
datumReader)){
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
}
}
上面代码传来的数据还是原来的原始类型
我不确定如何根据定义的逻辑类型获取此数据?
根据阅读,我认为我需要以某种方式实现转换,但不确定如何执行此操作...
有什么帮助吗?
我在这里找到了答案
按照下面修改我的代码,它有效
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyTimestampConversion());
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema1, schema1, genericData);
//DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); //original code
File file2 = new File("long.avro");
GenericRecord user = null;
try(DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader)){
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
}
}
已添加转化
public static class MyTimestampConversion extends Conversion<String> {
public MyTimestampConversion() {
}
public Class<String> getConvertedType() {
return String.class;
}
public String getLogicalTypeName() {
return "timestamp-millis";
}
public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
return (new DateTime(millisFromEpoch, DateTimeZone.UTC)).toString();
}
public Long toLong(String timestamp, Schema schema, LogicalType type) {
return new Long(timestamp);
}
public String fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
return (new DateTime(value, DateTimeZone.UTC)).toString();
}
public CharSequence toCharSequence(String value, Schema schema, LogicalType type) {
return value;
}
}
结果(原始,无效)
{"DateModified": 520171631042}
{"DateModified": 0}
结果(当前,工作)
{"DateModified": "1986-06-26T12:07:11.042Z"}
{"DateModified": "1970-01-01T00:00:00.000Z"}