为数据库中的多个表配置 debezium 连接器
Configure a debezium connector for multiple tables in a database
我正在尝试为 MySQL 数据库中的多个 table 配置 Debezium 连接器(我在 MySQL 8.0 上使用 debezium 1.4)。
我公司在kafka中创建主题时有一个命名模式要遵循,这个模式不允许使用下划线(_),所以我不得不用连字符(-)
替换它们
所以,我的主题名称是:
主题 1
fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status".
- All changes in that table, must go to that topic.
主题 2
fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.
主题 3
fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.
我正在尝试使用转换“ByLogicalTableRouter”,但找不到解决我的情况的正则表达式解决方案。
{ "name": "debezium.connector",
"config":
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1000",
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": ""
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "-"
}
}
- 在第一个转换中,我试图删除重复的模式
主题路由中的名称。
- 在第二次变换中,替换所有
对于 hiphens 仍然是下划线 _ -
但是,我收到了下面的错误,这表明它正在尝试将所有内容发送到同一主题
Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier
我如何进行转换,将每个 table 的事件转发到各自的主题?
- 正在删除架构名称
In the first transforms,im trying to remove the duplicated schema name in the topic routering.
用你的正则表达式转换后你会有两个点,所以你需要修复它:
"transforms.RerouteName.topic.regex":"([^.]+)\.transaction_search\.([^.]+)",
"transforms.RerouteName.topic.replacement": "."
- 替换 hiphens 的下划线
您可以尝试使用ChangeCase SMT from Kafka Connect Common Transformations。
我正在尝试为 MySQL 数据库中的多个 table 配置 Debezium 连接器(我在 MySQL 8.0 上使用 debezium 1.4)。 我公司在kafka中创建主题时有一个命名模式要遵循,这个模式不允许使用下划线(_),所以我不得不用连字符(-)
替换它们所以,我的主题名称是:
主题 1
fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status".
- All changes in that table, must go to that topic.
主题 2
fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.
主题 3
fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.
我正在尝试使用转换“ByLogicalTableRouter”,但找不到解决我的情况的正则表达式解决方案。
{ "name": "debezium.connector",
"config":
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1000",
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": ""
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "-"
}
}
- 在第一个转换中,我试图删除重复的模式 主题路由中的名称。
- 在第二次变换中,替换所有 对于 hiphens 仍然是下划线 _ -
但是,我收到了下面的错误,这表明它正在尝试将所有内容发送到同一主题
Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier
我如何进行转换,将每个 table 的事件转发到各自的主题?
- 正在删除架构名称
In the first transforms,im trying to remove the duplicated schema name in the topic routering.
用你的正则表达式转换后你会有两个点,所以你需要修复它:
"transforms.RerouteName.topic.regex":"([^.]+)\.transaction_search\.([^.]+)",
"transforms.RerouteName.topic.replacement": "."
- 替换 hiphens 的下划线
您可以尝试使用ChangeCase SMT from Kafka Connect Common Transformations。