Debezium,Kafka connect:有没有办法只发送有效负载而不发送模式?
Debezium, Kafka connect: is there a way to send only payload and not schema?
我在 kafka connect 中有一个发件箱 postgresql table 和 debezium 连接器,它根据添加到 table.
的行创建 kafka 消息
我面临的问题是消息格式。这是创建的消息值:
{
"schema": {
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1
},
"payload": "{\"foo\": \"bar\"}"
}
但是(因为消费者)我需要消息只包含有效负载,如下所示:
{
"\"foo\": \"bar\""
}
这是我的 kafka 连接器配置的一部分:
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.table.field.event.payload.id": "aggregate_id",
"transforms.outbox.table.fields.additional.placement": "payload_type:header:__TypeId__"
有什么方法可以在不创建自定义转换器的情况下实现这一点?
看起来您正在使用 org.apache.kafka.connect.json.JsonConverter
和 schemas.enable=true
作为您的值转换器。当您执行此操作时,它会将架构与消息中的有效负载一起嵌入。
如果您设置 value.converter.schemas.enable=false
,您应该只获取消息中的负载。
参考:Kafka Connect: Converters and Serialization Explained — JSON and Schemas
我在 kafka connect 中有一个发件箱 postgresql table 和 debezium 连接器,它根据添加到 table.
的行创建 kafka 消息我面临的问题是消息格式。这是创建的消息值:
{
"schema": {
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1
},
"payload": "{\"foo\": \"bar\"}"
}
但是(因为消费者)我需要消息只包含有效负载,如下所示:
{
"\"foo\": \"bar\""
}
这是我的 kafka 连接器配置的一部分:
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.table.field.event.payload.id": "aggregate_id",
"transforms.outbox.table.fields.additional.placement": "payload_type:header:__TypeId__"
有什么方法可以在不创建自定义转换器的情况下实现这一点?
看起来您正在使用 org.apache.kafka.connect.json.JsonConverter
和 schemas.enable=true
作为您的值转换器。当您执行此操作时,它会将架构与消息中的有效负载一起嵌入。
如果您设置 value.converter.schemas.enable=false
,您应该只获取消息中的负载。
参考:Kafka Connect: Converters and Serialization Explained — JSON and Schemas