如何在不添加架构名称的情况下从 Avro GenericRecord 转换为 JSON?
How to convert from Avro GenericRecord to JSON without adding schema name?
我有 2 个模式:
Event.avsc:
{
"type": "record",
"namespace": "com.onemount.jobs.transform.schema.avro",
"name": "Event",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "mtp_interest_submit",
"type": ["null", "InterestSubmitParam"],
"default": null
}
]
}
InterestSubmitParam.avsc:
{
"type": "record",
"namespace": "com.onemount.jobs.transform.schema.avro",
"name": "InterestSubmitParam",
"fields": [
{
"name": "interest",
"type": {
"type": "array",
"items": "string"
}
}
]
}
我正在使用来自 Kafka Confluent(使用 specific.avro.reader=false
)的 Avro 消息,需要从 GenericRecord
转换为 ObjectNode
。这是结果:
{
"id": "c8b76e58-9803-4c78-9f82-a185bda1cabf",
"mtp_interest_submit": {
"com.onemount.jobs.transform.schema.avro.InterestSubmitParam": {
"interest": [
"fashion",
"travel"
]
}
}
}
但我预计它应该是:
{
"id": "c8b76e58-9803-4c78-9f82-a185bda1cabf",
"mtp_interest_submit": {
"interest": [
"fashion",
"travel"
]
}
}
我该如何解决。这是我的转换器代码:
GenericRecord genericRecord = ...
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(genericRecord.getSchema(), outputStream);
writer.write(genericRecord, encoder);
encoder.flush();
return new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
}
非常感谢!
通过使用jackson-dataformat-avro
,问题已解决:
ObjectMapper mapper = new ObjectMapper(new AvroFactory());
GenericRecord genericRecord = ...;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
writer.write(genericRecord, encoder);
encoder.flush();
byte[] bytes = outputStream.toByteArray();
return mapper.readerFor(ObjectNode.class)
.with(new AvroSchema(genericRecord.getSchema()))
.readValue(bytes);
}
pom.xml:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-avro</artifactId>
<version>2.12.3</version>
</dependency>
我有 2 个模式:
Event.avsc:
{
"type": "record",
"namespace": "com.onemount.jobs.transform.schema.avro",
"name": "Event",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "mtp_interest_submit",
"type": ["null", "InterestSubmitParam"],
"default": null
}
]
}
InterestSubmitParam.avsc:
{
"type": "record",
"namespace": "com.onemount.jobs.transform.schema.avro",
"name": "InterestSubmitParam",
"fields": [
{
"name": "interest",
"type": {
"type": "array",
"items": "string"
}
}
]
}
我正在使用来自 Kafka Confluent(使用 specific.avro.reader=false
)的 Avro 消息,需要从 GenericRecord
转换为 ObjectNode
。这是结果:
{
"id": "c8b76e58-9803-4c78-9f82-a185bda1cabf",
"mtp_interest_submit": {
"com.onemount.jobs.transform.schema.avro.InterestSubmitParam": {
"interest": [
"fashion",
"travel"
]
}
}
}
但我预计它应该是:
{
"id": "c8b76e58-9803-4c78-9f82-a185bda1cabf",
"mtp_interest_submit": {
"interest": [
"fashion",
"travel"
]
}
}
我该如何解决。这是我的转换器代码:
GenericRecord genericRecord = ...
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(genericRecord.getSchema(), outputStream);
writer.write(genericRecord, encoder);
encoder.flush();
return new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
}
非常感谢!
通过使用jackson-dataformat-avro
,问题已解决:
ObjectMapper mapper = new ObjectMapper(new AvroFactory());
GenericRecord genericRecord = ...;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
writer.write(genericRecord, encoder);
encoder.flush();
byte[] bytes = outputStream.toByteArray();
return mapper.readerFor(ObjectNode.class)
.with(new AvroSchema(genericRecord.getSchema()))
.readValue(bytes);
}
pom.xml:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-avro</artifactId>
<version>2.12.3</version>
</dependency>