Avro logicalType String Date 转换为 EPOCH timestamp-milis
Avro logicalType String Date conversion to EPOCH timestamp-milis
我有以下架构
{"name": "timestampstring", "type": [{"type":"string","logicalType":"timestamp-millis"}, "null"]},
我打算向它提供日期,并进行转换将日期转换为纪元 mili。
GenericRecord user2 = new GenericData.Record(schema1);
user2.put("timestampstring", "2019-01-26T12:00:40.931");
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyTimestampConversion());
datumReader = new GenericDatumReader<GenericRecord>(schema2, schema2, genericData);
GenericRecord user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
//转化码
public static class MyTimestampConversion extends Conversion<Long> {
public MyTimestampConversion() {
}
public Class<Long> getConvertedType() {
return Long.class;
}
public String getLogicalTypeName() {
return "timestamp-millis";
}
public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
return 123L;
}
}
但是这段代码不起作用...我希望它能转换为时间戳 milis(我在上面的示例中硬编码了 123L)。
有什么帮助吗?
回顾 ,我通过创建自己的逻辑类型设法解决了这个问题。似乎用“timestamp-millis” logicalType 做这件事是行不通的。所以我创建了自己的 logicalType...
package example;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.*;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.ResolvingDecoder;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
public class AvroWriteDateUtcToEpochMili {
public static void main(String[] args) throws IOException {
Boolean isRegisterNewLogicalType = true;
Boolean isWrite = true;
if(isRegisterNewLogicalType) {
LogicalTypes.register(UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
private final LogicalType convertLongLogicalType = new UtcDateTimeToTimestampMilisLogicalType();
@Override
public LogicalType fromSchema(Schema schema) {
return convertLongLogicalType;
}
});
}
Schema schema1 = new Parser().parse(new File("./userdate_modified_string.avsc"));
// Serialize user1 and user2 to disk
File file1 = new File("users.avro");
if(isWrite) {
GenericRecord user1 = new GenericData.Record(schema1);
user1.put("timestamplong", "2019-07-09T04:31:45.281Z");
//user1.put("timestamplong", 1L);
user1.put("timestampstring", "2019-07-09T04:31:45.281Z");
GenericRecord user2 = new GenericData.Record(schema1);
//user2.put("timestamplong", "2018-07-09T04:30:45.781Z");
user2.put("timestamplong", 2L);
user2.put("timestampstring", (new DateTime(2L)).toString());
//user2.put("timestampstring", new Timestamp(new Date("2018-01-26").getTime()));
var currentDateTime = DateTime.now();
GenericRecord user3 = new GenericData.Record(schema1);
user3.put("timestamplong", currentDateTime.toString());
//user3.put("timestamplong", 3L);
user3.put("timestampstring", currentDateTime.toString());
final GenericData genericData2 = new GenericData();
genericData2.addLogicalTypeConversion(new MyStringTimestampConversion());
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema1, genericData2);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema1, file1);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
}
// Deserialize users from disk
Boolean once = true;
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema1);
GenericRecord user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
if(once) {
System.out.println(user.getSchema());
once = false;
}
//System.out.println(LogicalTypes.fromSchema(user.getSchema()));
System.out.println(user);
}
}
// Deserialize users from disk
System.out.println("//AFTER");
Schema schema2 = new Parser().parse(new File("./userdate_modified_string.avsc"));
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyStringTimestampConversion());
datumReader = new MyReader<GenericRecord>(schema2, schema2, genericData);
user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
}
public static class MyReader<G extends IndexedRecord> extends GenericDatumReader {
public MyReader() {
super();
}
public MyReader(Schema writer, Schema reader, GenericData data) {
super(writer, reader, data);
}
@Override
protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
Object datum = this.readWithoutConversion(old, expected, in);
LogicalType logicalType = expected.getLogicalType();
if (logicalType != null) {
Conversion<?> conversion = this.getData().getConversionFor(logicalType);
if (conversion != null) {
return this.convert(datum, expected, logicalType, conversion);
}
}
return datum;
}
}
public static class MyStringTimestampConversion extends Conversion<String> {
public MyStringTimestampConversion() {
super();
}
@Override
public Class<String> getConvertedType() {
return String.class;
}
@Override
public String getLogicalTypeName() {
// "timestamp-millis";
return UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME;
}
@Override
public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
return (new DateTime(millisFromEpoch)).toString();
//return "123456L";
}
@Override
public Long toLong(String value, Schema schema, LogicalType type) { //
//DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSS'Z'");//
DateTime dateTime = DateTime.parse(value);
long epochMilli = dateTime.toDate().toInstant().toEpochMilli();
return epochMilli;
}
}
}
逻辑类型
public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
//The key to use as a reference to the type
public static final String CONVERT_LONG_TYPE_NAME = "utc-to-epoch-millis";
public UtcDateTimeToTimestampMilisLogicalType() {
super(CONVERT_LONG_TYPE_NAME);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException(
"Logical type 'utc-to-epoch-millis' must be backed by bytes");
}
}
}
架构
{
"namespace": "example.avro.modified.string",
"type": "record",
"name": "UserDate",
"fields": [
{
"name": "timestamplong",
"type":
{
"type": "long",
"logicalType": "utc-to-epoch-millis"
}
},
{
"name": "timestampstring",
"type": "string"
}
]
}
结果
{"type":"record","name":"UserDate","namespace":"example.avro.modified.string","fields":[{"name":"timestamplong","type":{"type":"long","logicalType":"utc-to-epoch-millis"}},{"name":"timestampstring","type":"string"}]}
{"timestamplong": 1562646705281, "timestampstring": "2019-07-09T04:31:45.281Z"}
{"timestamplong": 2, "timestampstring": "1970-01-01T07:30:00.002+07:30"}
{"timestamplong": 1601616694713, "timestampstring": "2020-10-02T13:31:34.713+08:00"}
//AFTER
{"timestamplong": "2019-07-09T12:31:45.281+08:00", "timestampstring": "2019-07-09T04:31:45.281Z"}
{"timestamplong": "1970-01-01T07:30:00.002+07:30", "timestampstring": "1970-01-01T07:30:00.002+07:30"}
{"timestamplong": "2020-10-02T13:31:34.713+08:00", "timestampstring": "2020-10-02T13:31:34.713+08:00"}
Process finished with exit code 0
我有以下架构
{"name": "timestampstring", "type": [{"type":"string","logicalType":"timestamp-millis"}, "null"]},
我打算向它提供日期,并进行转换将日期转换为纪元 mili。
GenericRecord user2 = new GenericData.Record(schema1);
user2.put("timestampstring", "2019-01-26T12:00:40.931");
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyTimestampConversion());
datumReader = new GenericDatumReader<GenericRecord>(schema2, schema2, genericData);
GenericRecord user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
//转化码
public static class MyTimestampConversion extends Conversion<Long> {
public MyTimestampConversion() {
}
public Class<Long> getConvertedType() {
return Long.class;
}
public String getLogicalTypeName() {
return "timestamp-millis";
}
public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
return 123L;
}
}
但是这段代码不起作用...我希望它能转换为时间戳 milis(我在上面的示例中硬编码了 123L)。
有什么帮助吗?
回顾
package example;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.*;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.ResolvingDecoder;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
public class AvroWriteDateUtcToEpochMili {
public static void main(String[] args) throws IOException {
Boolean isRegisterNewLogicalType = true;
Boolean isWrite = true;
if(isRegisterNewLogicalType) {
LogicalTypes.register(UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
private final LogicalType convertLongLogicalType = new UtcDateTimeToTimestampMilisLogicalType();
@Override
public LogicalType fromSchema(Schema schema) {
return convertLongLogicalType;
}
});
}
Schema schema1 = new Parser().parse(new File("./userdate_modified_string.avsc"));
// Serialize user1 and user2 to disk
File file1 = new File("users.avro");
if(isWrite) {
GenericRecord user1 = new GenericData.Record(schema1);
user1.put("timestamplong", "2019-07-09T04:31:45.281Z");
//user1.put("timestamplong", 1L);
user1.put("timestampstring", "2019-07-09T04:31:45.281Z");
GenericRecord user2 = new GenericData.Record(schema1);
//user2.put("timestamplong", "2018-07-09T04:30:45.781Z");
user2.put("timestamplong", 2L);
user2.put("timestampstring", (new DateTime(2L)).toString());
//user2.put("timestampstring", new Timestamp(new Date("2018-01-26").getTime()));
var currentDateTime = DateTime.now();
GenericRecord user3 = new GenericData.Record(schema1);
user3.put("timestamplong", currentDateTime.toString());
//user3.put("timestamplong", 3L);
user3.put("timestampstring", currentDateTime.toString());
final GenericData genericData2 = new GenericData();
genericData2.addLogicalTypeConversion(new MyStringTimestampConversion());
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema1, genericData2);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema1, file1);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
}
// Deserialize users from disk
Boolean once = true;
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema1);
GenericRecord user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
if(once) {
System.out.println(user.getSchema());
once = false;
}
//System.out.println(LogicalTypes.fromSchema(user.getSchema()));
System.out.println(user);
}
}
// Deserialize users from disk
System.out.println("//AFTER");
Schema schema2 = new Parser().parse(new File("./userdate_modified_string.avsc"));
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyStringTimestampConversion());
datumReader = new MyReader<GenericRecord>(schema2, schema2, genericData);
user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
}
public static class MyReader<G extends IndexedRecord> extends GenericDatumReader {
public MyReader() {
super();
}
public MyReader(Schema writer, Schema reader, GenericData data) {
super(writer, reader, data);
}
@Override
protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
Object datum = this.readWithoutConversion(old, expected, in);
LogicalType logicalType = expected.getLogicalType();
if (logicalType != null) {
Conversion<?> conversion = this.getData().getConversionFor(logicalType);
if (conversion != null) {
return this.convert(datum, expected, logicalType, conversion);
}
}
return datum;
}
}
public static class MyStringTimestampConversion extends Conversion<String> {
public MyStringTimestampConversion() {
super();
}
@Override
public Class<String> getConvertedType() {
return String.class;
}
@Override
public String getLogicalTypeName() {
// "timestamp-millis";
return UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME;
}
@Override
public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
return (new DateTime(millisFromEpoch)).toString();
//return "123456L";
}
@Override
public Long toLong(String value, Schema schema, LogicalType type) { //
//DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSS'Z'");//
DateTime dateTime = DateTime.parse(value);
long epochMilli = dateTime.toDate().toInstant().toEpochMilli();
return epochMilli;
}
}
}
逻辑类型
public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
//The key to use as a reference to the type
public static final String CONVERT_LONG_TYPE_NAME = "utc-to-epoch-millis";
public UtcDateTimeToTimestampMilisLogicalType() {
super(CONVERT_LONG_TYPE_NAME);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException(
"Logical type 'utc-to-epoch-millis' must be backed by bytes");
}
}
}
架构
{
"namespace": "example.avro.modified.string",
"type": "record",
"name": "UserDate",
"fields": [
{
"name": "timestamplong",
"type":
{
"type": "long",
"logicalType": "utc-to-epoch-millis"
}
},
{
"name": "timestampstring",
"type": "string"
}
]
}
结果
{"type":"record","name":"UserDate","namespace":"example.avro.modified.string","fields":[{"name":"timestamplong","type":{"type":"long","logicalType":"utc-to-epoch-millis"}},{"name":"timestampstring","type":"string"}]}
{"timestamplong": 1562646705281, "timestampstring": "2019-07-09T04:31:45.281Z"}
{"timestamplong": 2, "timestampstring": "1970-01-01T07:30:00.002+07:30"}
{"timestamplong": 1601616694713, "timestampstring": "2020-10-02T13:31:34.713+08:00"}
//AFTER
{"timestamplong": "2019-07-09T12:31:45.281+08:00", "timestampstring": "2019-07-09T04:31:45.281Z"}
{"timestamplong": "1970-01-01T07:30:00.002+07:30", "timestampstring": "1970-01-01T07:30:00.002+07:30"}
{"timestamplong": "2020-10-02T13:31:34.713+08:00", "timestampstring": "2020-10-02T13:31:34.713+08:00"}
Process finished with exit code 0