如何在 Debezium 连接器中分离 database.history 消息和关于行的消息?

How to separate database.history messages and messages regarding rows in Debezium connector?

我在 Kafka Connect 中创建了 Debezium 连接器。它有效,但如果我想添加任何 SMT,它就会中断 - DDL 更改有不同的消息结构(例如 table 架构中的更改)并且行中的更改也不同。

{
    "name": "test_debezium",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": localhost",
        "database.port": "3306",
        "database.user": "kafkaconnect",
        "database.password": "***",
        "database.server.name": "test-debezium",
        "database.include.list": "database",
        "table.include.list": "database.table",
        "database.history.kafka.topic": "test_debezium_history",
        "database.history​.kafka.recovery​.poll.interval.ms": 5000,
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "transforms": "createKey,extractInt",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "client_id",
        "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractInt.field": "client_id"
    }
}

如何区分 database.history 消息和 database.table 主题,关于行的变化?

DDL 消息示例:

{"databaseName":"database"}-{"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":null,"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"databaseName":"database","ddl":"CREATE DATABASE IF EXISTS `database`"}

row 消息示例:

{"client_id":328}-{"before":null,"after":{"test-debezium.database.table.Value":{"client_id":328,"first_name":"Ignacy","uuid":"000"}},"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":{"string":"table"},"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"op":"c","ts_ms":{"long":1611301059407},"transaction":null}

因此相同的 Key 提取器不起作用 - 如何对不同的主题使用不同的提取器或如何仅在 row 消息中使用提取器?

谢谢!

您可以使用添加到 Apache Kafka 2.6 中的单一消息转换的 Predicate 选项。

您需要指定谓词,我们将根据主题名称指定谓词。因为 table 数据的目标主题可能会有所不同,我们将根据固定的历史主题名称构建谓词:

"predicates.isHistoryTopic.type"   : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHistoryTopic.pattern": "test_debezium_history"

然后对于我们想要限制其执行的每个单一消息转换,我们将 predicate 添加到 transform 配置。由于我们希望它 notdo 匹配谓词的消息执行,我们添加 negate 选项。

最终配置如下所示:

"transforms"                       : "createKey,extractInt",
"transforms.createKey.type"        : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields"      : "client_id",
"transforms.createKey.predicate"   : "isHistoryTopic",
"transforms.createKey.negate"      : "true",

"transforms.extractInt.type"       : "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field"      : "client_id",
"transforms.extractInt.predicate"  : "isHistoryTopic",
"transforms.extractInt.negate"     : "true",

"predicates"                       : "isHistoryTopic",
"predicates.isHistoryTopic.type"   : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHistoryTopic.pattern": "test_debezium_history"

blog and video 详细介绍了 Predicate 选项。