当类型为 null 或存在默认值时,Kafka 消息字段被嵌套
Kafka message field is being nested when type has null or default is present
我正在使用 Avro 序列化程序将消息推送到 kafka 主题。我从下面的 avro 模式
中生成了 Java class
{
"type": "record",
"name": "CommonHeader",
"fields": [
{
"name": "domain",
"type": "string"
},
{
"name": "eventSource",
"type": "string"
},
{
"name": "eventEntity",
"type": "string"
},
{
"name": "eventType",
"type": "string"
},
{
"name": "tsEventPublished",
"type": "string"
},
{
"name": "tsEntityCreated",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "tsEntityUpdated",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "isSynthetic",
"type": [
"boolean"
],
"default": false
},
{
"name": "correlationId",
"type": "string"
}
]
}
这里附上生成的classCommonHeader.java
当我向kafka写入消息时,消息是这样的
"domain": "product",
"eventSource": "source",
"eventEntity": "entity",
"eventType": "Create",
"tsEventPublished": "2021-11-04T02:39:42.261Z",
"tsEntityCreated": {
"string": "2021-11-21T21:25:04.000Z"
},
"tsEntityUpdated": {
"string": "2011-11-04T00:11:52.000Z"
},
"isSynthetic": {
"boolean": false
},
"correlationId": "id"
}
为什么“isSynthetic”和“tsEntityUpdated”的值是嵌套的?我怎样才能避免这种情况?
Spring kafka 的引导属性:
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.properties.auto.register.schemas=false
spring.kafka.properties.specific.avro.reader=true
插件信息:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
根据官方文档,Avro 是这样工作的。田野
tsEntityCreated
、tsEntityUpdated
"type": [
"null",
"string"
]
和isSynthetic
"type": [
"boolean"
]
定义为Unions. There are special rules for unions for JSON encoding:
The value of a union is encoded in JSON as follows:
- if its type is null, then it is encoded as a JSON null;
- otherwise it is encoded as a JSON object with one name/value pair whose name is the type's name and whose value is the recursively encoded value. For Avro's named types (record, fixed or enum) the user-specified name is used, for other types the type name is used.
For example, the union schema ["null","string"] would encode:
- null as null;
- the string "a" as {"string": "a"};
我正在使用 Avro 序列化程序将消息推送到 kafka 主题。我从下面的 avro 模式
中生成了 Java class {
"type": "record",
"name": "CommonHeader",
"fields": [
{
"name": "domain",
"type": "string"
},
{
"name": "eventSource",
"type": "string"
},
{
"name": "eventEntity",
"type": "string"
},
{
"name": "eventType",
"type": "string"
},
{
"name": "tsEventPublished",
"type": "string"
},
{
"name": "tsEntityCreated",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "tsEntityUpdated",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "isSynthetic",
"type": [
"boolean"
],
"default": false
},
{
"name": "correlationId",
"type": "string"
}
]
}
这里附上生成的classCommonHeader.java
当我向kafka写入消息时,消息是这样的
"domain": "product",
"eventSource": "source",
"eventEntity": "entity",
"eventType": "Create",
"tsEventPublished": "2021-11-04T02:39:42.261Z",
"tsEntityCreated": {
"string": "2021-11-21T21:25:04.000Z"
},
"tsEntityUpdated": {
"string": "2011-11-04T00:11:52.000Z"
},
"isSynthetic": {
"boolean": false
},
"correlationId": "id"
}
为什么“isSynthetic”和“tsEntityUpdated”的值是嵌套的?我怎样才能避免这种情况?
Spring kafka 的引导属性:
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.properties.auto.register.schemas=false
spring.kafka.properties.specific.avro.reader=true
插件信息:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
根据官方文档,Avro 是这样工作的。田野
tsEntityCreated
、tsEntityUpdated
"type": [
"null",
"string"
]
和isSynthetic
"type": [
"boolean"
]
定义为Unions. There are special rules for unions for JSON encoding:
The value of a union is encoded in JSON as follows:
- if its type is null, then it is encoded as a JSON null;
- otherwise it is encoded as a JSON object with one name/value pair whose name is the type's name and whose value is the recursively encoded value. For Avro's named types (record, fixed or enum) the user-specified name is used, for other types the type name is used.
For example, the union schema ["null","string"] would encode:
- null as null;
- the string "a" as {"string": "a"};