如何在 Debezium 连接器中分离 database.history 消息和关于行的消息?
How to separate database.history messages and messages regarding rows in Debezium connector?
我在 Kafka Connect 中创建了 Debezium 连接器。它有效,但如果我想添加任何 SMT,它就会中断 - DDL 更改有不同的消息结构(例如 table 架构中的更改)并且行中的更改也不同。
{
"name": "test_debezium",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": localhost",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "***",
"database.server.name": "test-debezium",
"database.include.list": "database",
"table.include.list": "database.table",
"database.history.kafka.topic": "test_debezium_history",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "localhost:9092",
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "client_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "client_id"
}
}
如何区分 database.history
消息和 database.table
主题,关于行的变化?
DDL 消息示例:
{"databaseName":"database"}-{"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":null,"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"databaseName":"database","ddl":"CREATE DATABASE IF EXISTS `database`"}
row
消息示例:
{"client_id":328}-{"before":null,"after":{"test-debezium.database.table.Value":{"client_id":328,"first_name":"Ignacy","uuid":"000"}},"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":{"string":"table"},"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"op":"c","ts_ms":{"long":1611301059407},"transaction":null}
因此相同的 Key 提取器不起作用 - 如何对不同的主题使用不同的提取器或如何仅在 row
消息中使用提取器?
谢谢!
您可以使用添加到 Apache Kafka 2.6 中的单一消息转换的 Predicate
选项。
您需要指定谓词,我们将根据主题名称指定谓词。因为 table 数据的目标主题可能会有所不同,我们将根据固定的历史主题名称构建谓词:
"predicates.isHistoryTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHistoryTopic.pattern": "test_debezium_history"
然后对于我们想要限制其执行的每个单一消息转换,我们将 predicate
添加到 transform
配置。由于我们希望它 not 对 do 匹配谓词的消息执行,我们添加 negate
选项。
最终配置如下所示:
"transforms" : "createKey,extractInt",
"transforms.createKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields" : "client_id",
"transforms.createKey.predicate" : "isHistoryTopic",
"transforms.createKey.negate" : "true",
"transforms.extractInt.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field" : "client_id",
"transforms.extractInt.predicate" : "isHistoryTopic",
"transforms.extractInt.negate" : "true",
"predicates" : "isHistoryTopic",
"predicates.isHistoryTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHistoryTopic.pattern": "test_debezium_history"
我在 Kafka Connect 中创建了 Debezium 连接器。它有效,但如果我想添加任何 SMT,它就会中断 - DDL 更改有不同的消息结构(例如 table 架构中的更改)并且行中的更改也不同。
{
"name": "test_debezium",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": localhost",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "***",
"database.server.name": "test-debezium",
"database.include.list": "database",
"table.include.list": "database.table",
"database.history.kafka.topic": "test_debezium_history",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "localhost:9092",
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "client_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "client_id"
}
}
如何区分 database.history
消息和 database.table
主题,关于行的变化?
DDL 消息示例:
{"databaseName":"database"}-{"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":null,"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"databaseName":"database","ddl":"CREATE DATABASE IF EXISTS `database`"}
row
消息示例:
{"client_id":328}-{"before":null,"after":{"test-debezium.database.table.Value":{"client_id":328,"first_name":"Ignacy","uuid":"000"}},"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":{"string":"table"},"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"op":"c","ts_ms":{"long":1611301059407},"transaction":null}
因此相同的 Key 提取器不起作用 - 如何对不同的主题使用不同的提取器或如何仅在 row
消息中使用提取器?
谢谢!
您可以使用添加到 Apache Kafka 2.6 中的单一消息转换的 Predicate
选项。
您需要指定谓词,我们将根据主题名称指定谓词。因为 table 数据的目标主题可能会有所不同,我们将根据固定的历史主题名称构建谓词:
"predicates.isHistoryTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHistoryTopic.pattern": "test_debezium_history"
然后对于我们想要限制其执行的每个单一消息转换,我们将 predicate
添加到 transform
配置。由于我们希望它 not 对 do 匹配谓词的消息执行,我们添加 negate
选项。
最终配置如下所示:
"transforms" : "createKey,extractInt",
"transforms.createKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields" : "client_id",
"transforms.createKey.predicate" : "isHistoryTopic",
"transforms.createKey.negate" : "true",
"transforms.extractInt.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field" : "client_id",
"transforms.extractInt.predicate" : "isHistoryTopic",
"transforms.extractInt.negate" : "true",
"predicates" : "isHistoryTopic",
"predicates.isHistoryTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHistoryTopic.pattern": "test_debezium_history"