Kafka Connect 和 Debezium MySQL source - 如何去掉消息 Key 中的 Struct{}?

Kafka Connect and Debezium MySQL source - How do you get rid of Struct{} in the message Key?

debezium MySQL 连接器生成的 Kafka 消息密钥类似于 Struct{id=1} 而不是 1

我的配置如下:

connector.class=io.debezium.connector.mysql.MySqlConnector
name=mysql-source
database.server.name=dbserver1
tasks.max=1
database.hostname=127.0.0.1
database.port=3306
database.user=root
database.password=*****
database.server.id=1
topic.creation.enable=true
topic.creation.default.replication.factor=-1
topic.creation.default.partitions=1
database.history.kafka.bootstrap.servers=*****
database.history.kafka.topic=dbserver1-schema-history
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=PLAIN
database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="JM6MDWHYK4SKMDX6" password="Vvah2zS6bDaYAUYgYiod/iatMtEjC6vnwIuNM847JdwH+M+SSQzhxsTSI+GnjY5z";
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=PLAIN
database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="****";

并添加

transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Key

无效。

然后是sink时delete模式的问题,因为目标数据库的主键列是这样的:

|id          |
---------------
|Struct{id=x}|

有什么帮助吗?

Flatten 转换将嵌套结构转换为平面结构。您似乎在寻找 ExtractField$Key...

或者,使用 value.converter 的 JSON/Avro/Protobuf 选项而不是 StringConverter 来删除 Struct.toString() 表示。

这还会将键值对提取到与您的接收器连接器兼容的数据中