Kafka Connect 和 Debezium MySQL source - 如何去掉消息 Key 中的 Struct{}?
Kafka Connect and Debezium MySQL source - How do you get rid of Struct{} in the message Key?
debezium MySQL 连接器生成的 Kafka 消息密钥类似于 Struct{id=1}
而不是 1
。
我的配置如下:
connector.class=io.debezium.connector.mysql.MySqlConnector
name=mysql-source
database.server.name=dbserver1
tasks.max=1
database.hostname=127.0.0.1
database.port=3306
database.user=root
database.password=*****
database.server.id=1
topic.creation.enable=true
topic.creation.default.replication.factor=-1
topic.creation.default.partitions=1
database.history.kafka.bootstrap.servers=*****
database.history.kafka.topic=dbserver1-schema-history
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=PLAIN
database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="JM6MDWHYK4SKMDX6" password="Vvah2zS6bDaYAUYgYiod/iatMtEjC6vnwIuNM847JdwH+M+SSQzhxsTSI+GnjY5z";
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=PLAIN
database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="****";
并添加
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Key
无效。
然后是sink时delete模式的问题,因为目标数据库的主键列是这样的:
|id |
---------------
|Struct{id=x}|
有什么帮助吗?
Flatten 转换将嵌套结构转换为平面结构。您似乎在寻找 ExtractField$Key
...
或者,使用 value.converter
的 JSON/Avro/Protobuf 选项而不是 StringConverter
来删除 Struct.toString()
表示。
这还会将键值对提取到与您的接收器连接器兼容的数据中
debezium MySQL 连接器生成的 Kafka 消息密钥类似于 Struct{id=1}
而不是 1
。
我的配置如下:
connector.class=io.debezium.connector.mysql.MySqlConnector
name=mysql-source
database.server.name=dbserver1
tasks.max=1
database.hostname=127.0.0.1
database.port=3306
database.user=root
database.password=*****
database.server.id=1
topic.creation.enable=true
topic.creation.default.replication.factor=-1
topic.creation.default.partitions=1
database.history.kafka.bootstrap.servers=*****
database.history.kafka.topic=dbserver1-schema-history
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=PLAIN
database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="JM6MDWHYK4SKMDX6" password="Vvah2zS6bDaYAUYgYiod/iatMtEjC6vnwIuNM847JdwH+M+SSQzhxsTSI+GnjY5z";
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=PLAIN
database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="****";
并添加
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Key
无效。
然后是sink时delete模式的问题,因为目标数据库的主键列是这样的:
|id |
---------------
|Struct{id=x}|
有什么帮助吗?
Flatten 转换将嵌套结构转换为平面结构。您似乎在寻找 ExtractField$Key
...
或者,使用 value.converter
的 JSON/Avro/Protobuf 选项而不是 StringConverter
来删除 Struct.toString()
表示。
这还会将键值对提取到与您的接收器连接器兼容的数据中