如何在不添加架构名称的情况下从 Avro GenericRecord 转换为 JSON?

How to convert from Avro GenericRecord to JSON without adding schema name?

我有 2 个模式:

Event.avsc:

{
  "type": "record",
  "namespace": "com.onemount.jobs.transform.schema.avro",
  "name": "Event",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "mtp_interest_submit",
      "type": ["null", "InterestSubmitParam"],
      "default": null
    }
  ]
}

InterestSubmitParam.avsc:

{
  "type": "record",
  "namespace": "com.onemount.jobs.transform.schema.avro",
  "name": "InterestSubmitParam",
  "fields": [
    {
      "name": "interest",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

我正在使用来自 Kafka Confluent(使用 specific.avro.reader=false)的 Avro 消息,需要从 GenericRecord 转换为 ObjectNode。这是结果:

{
  "id": "c8b76e58-9803-4c78-9f82-a185bda1cabf",
  "mtp_interest_submit": {
    "com.onemount.jobs.transform.schema.avro.InterestSubmitParam": {
      "interest": [
        "fashion",
        "travel"
      ]
    }
  }
}

但我预计它应该是:

{
  "id": "c8b76e58-9803-4c78-9f82-a185bda1cabf",
  "mtp_interest_submit": {
    "interest": [
      "fashion",
      "travel"
    ]
  }
}

我该如何解决。这是我的转换器代码:

GenericRecord genericRecord = ...
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(genericRecord.getSchema(), outputStream);
    writer.write(genericRecord, encoder);
    encoder.flush();

    return new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
}

非常感谢!

通过使用jackson-dataformat-avro,问题已解决:

ObjectMapper mapper = new ObjectMapper(new AvroFactory());

GenericRecord genericRecord = ...;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
    writer.write(genericRecord, encoder);
    encoder.flush();

    byte[] bytes = outputStream.toByteArray();

    return mapper.readerFor(ObjectNode.class)
            .with(new AvroSchema(genericRecord.getSchema()))
            .readValue(bytes);
}

pom.xml:

    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-avro</artifactId>
        <version>2.12.3</version>
    </dependency>