当类型为 null 或存在默认值时,Kafka 消息字段被嵌套

Kafka message field is being nested when type has null or default is present

我正在使用 Avro 序列化程序将消息推送到 kafka 主题。我从下面的 avro 模式

中生成了 Java class
 {
    "type": "record",
    "name": "CommonHeader",
    "fields": [
      {
        "name": "domain",
        "type": "string"
      },
      {
        "name": "eventSource",
        "type": "string"
      },
      {
        "name": "eventEntity",
        "type": "string"
      },
      {
        "name": "eventType",
        "type": "string"
      },
      {
        "name": "tsEventPublished",
        "type": "string"
      },
      {
        "name": "tsEntityCreated",
        "type": [
          "null",
          "string"
        ],
        "default": null
      },
      {
        "name": "tsEntityUpdated",
        "type": [
          "null",
          "string"
        ],
        "default": null
      },
      {
        "name": "isSynthetic",
        "type": [
          "boolean"
        ],
        "default": false
      },
      {
        "name": "correlationId",
        "type": "string"
      }
    ]
  }

这里附上生成的classCommonHeader.java

当我向kafka写入消息时,消息是这样的

    "domain": "product",
    "eventSource": "source",
    "eventEntity": "entity",
    "eventType": "Create",
    "tsEventPublished": "2021-11-04T02:39:42.261Z",
    "tsEntityCreated": {
      "string": "2021-11-21T21:25:04.000Z"
    },
    "tsEntityUpdated": {
      "string": "2011-11-04T00:11:52.000Z"
    },
    "isSynthetic": {
      "boolean": false
    },
    "correlationId": "id"
  }

为什么“isSynthetic”和“tsEntityUpdated”的值是嵌套的?我怎样才能避免这种情况?

Spring kafka 的引导属性:

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.properties.auto.register.schemas=false
spring.kafka.properties.specific.avro.reader=true

插件信息:

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <id>schemas</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

根据官方文档,Avro 是这样工作的。田野 tsEntityCreatedtsEntityUpdated

"type": [
   "null",
   "string"
]

isSynthetic

"type": [
   "boolean"
]

定义为Unions. There are special rules for unions for JSON encoding:

The value of a union is encoded in JSON as follows:

  • if its type is null, then it is encoded as a JSON null;
  • otherwise it is encoded as a JSON object with one name/value pair whose name is the type's name and whose value is the recursively encoded value. For Avro's named types (record, fixed or enum) the user-specified name is used, for other types the type name is used.

For example, the union schema ["null","string"] would encode:

  • null as null;
  • the string "a" as {"string": "a"};