如何更改Kafka Connect Source Connector生成的topic名称
How to change the name of the topic generated by Kafka Connect Source Connector
我已经 运行 生产部署了 Kafka-Cluster 并且有主题“existing-topic”。我正在使用 Debezium 的 MongoDB-Source-Connector。
这里我想要的是将 CDC 事件直接推送到主题“existing-topic”,以便已经收听该主题的消费者将处理它。
我没有找到任何资源来这样做,但是有人提到主题是按以下格式创建的 -
"If your mongodb.name parameter is A, database name is B and collection name is C, the data from database A and collection C will be loaded under the topic A.B.C"
我可以将主题更改为 "existing-topic" 并将事件推送到它吗?
根据 documentation、
The name of the Kafka topics always takes the form
logicalName.databaseName.collectionName
, where logicalName
is the
logical name of the connector as specified with the mongodb.name
configuration property, databaseName
is the name of the database where
the operation occurred, and collectionName
is the name of the MongoDB
collection in which the affected document existed.
这意味着如果您的连接器的逻辑名称是 myConnector
并且您的数据库 myDatabase
有两个集合 users
和 orders
{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo-db-host:27017",
"mongodb.name": "myDatabase",
"collection.whitelist": "myDatabase[.]*",
}
}
然后 Kafka Connect 将使用名称填充两个主题:
myConnector.myDatabase.users
myConnector.myDatabase.orders
现在,如果您仍想更改目标主题的名称,可以使用 Kafka Connect Single Message Transforms (SMT)。更准确地说,ExtractTopic
应该可以帮助您。请注意,尽管此 SMT 可帮助您从消息的键或值中提取主题名称,因此您需要以某种方式在有效负载中包含所需的主题名称。
例如,下面的 SMT 将提取字段 myField
的值并将其用作记录的主题:
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=myField
我在使用 JDBC 源连接器时遇到了同样的问题,但找到了不同的解决方案:
通过 dropPrefix
使用 RegexRouter
单消息转换,您可以覆盖整个主题名称:
"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"A.B.C", // whole created topic name
"transforms.dropPrefix.replacement":"existing-topic" // whole exisiting topic name
它适用于正则表达式,因此如果您使用多个 tables/collections 并且您创建的主题名称不是固定的,您应该能够使其动态化。
这有点老套,因为从技术上讲,我删除了整个主题名称,然后添加了一个新主题名称 - 无论如何,这对我来说不是最好的解决方案。
我已经 运行 生产部署了 Kafka-Cluster 并且有主题“existing-topic”。我正在使用 Debezium 的 MongoDB-Source-Connector。
这里我想要的是将 CDC 事件直接推送到主题“existing-topic”,以便已经收听该主题的消费者将处理它。
我没有找到任何资源来这样做,但是有人提到主题是按以下格式创建的 -
"If your mongodb.name parameter is A, database name is B and collection name is C, the data from database A and collection C will be loaded under the topic A.B.C"
我可以将主题更改为 "existing-topic" 并将事件推送到它吗?
根据 documentation、
The name of the Kafka topics always takes the form
logicalName.databaseName.collectionName
, wherelogicalName
is the logical name of the connector as specified with themongodb.name
configuration property,databaseName
is the name of the database where the operation occurred, andcollectionName
is the name of the MongoDB collection in which the affected document existed.
这意味着如果您的连接器的逻辑名称是 myConnector
并且您的数据库 myDatabase
有两个集合 users
和 orders
{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo-db-host:27017",
"mongodb.name": "myDatabase",
"collection.whitelist": "myDatabase[.]*",
}
}
然后 Kafka Connect 将使用名称填充两个主题:
myConnector.myDatabase.users
myConnector.myDatabase.orders
现在,如果您仍想更改目标主题的名称,可以使用 Kafka Connect Single Message Transforms (SMT)。更准确地说,ExtractTopic
应该可以帮助您。请注意,尽管此 SMT 可帮助您从消息的键或值中提取主题名称,因此您需要以某种方式在有效负载中包含所需的主题名称。
例如,下面的 SMT 将提取字段 myField
的值并将其用作记录的主题:
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=myField
我在使用 JDBC 源连接器时遇到了同样的问题,但找到了不同的解决方案:
通过 dropPrefix
使用 RegexRouter
单消息转换,您可以覆盖整个主题名称:
"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"A.B.C", // whole created topic name
"transforms.dropPrefix.replacement":"existing-topic" // whole exisiting topic name
它适用于正则表达式,因此如果您使用多个 tables/collections 并且您创建的主题名称不是固定的,您应该能够使其动态化。
这有点老套,因为从技术上讲,我删除了整个主题名称,然后添加了一个新主题名称 - 无论如何,这对我来说不是最好的解决方案。