如何更改Kafka Connect Source Connector生成的topic名称

How to change the name of the topic generated by Kafka Connect Source Connector

我已经 运行 生产部署了 Kafka-Cluster 并且有主题“existing-topic”。我正在使用 Debezium 的 MongoDB-Source-Connector。

这里我想要的是将 CDC 事件直接推送到主题“existing-topic”,以便已经收听该主题的消费者将处理它。

我没有找到任何资源来这样做,但是有人提到主题是按以下格式创建的 -

"If your mongodb.name parameter is A, database name is B and collection name is C, the data from database A and collection C will be loaded under the topic A.B.C"

我可以将主题更改为 "existing-topic" 并将事件推送到它吗?

根据 documentation

The name of the Kafka topics always takes the form logicalName.databaseName.collectionName, where logicalName is the logical name of the connector as specified with the mongodb.name configuration property, databaseName is the name of the database where the operation occurred, and collectionName is the name of the MongoDB collection in which the affected document existed.


这意味着如果您的连接器的逻辑名称是 myConnector 并且您的数据库 myDatabase 有两个集合 usersorders

{
  "name": "myConnector",  
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "mongodb.hosts": "mongo-db-host:27017", 
    "mongodb.name": "myDatabase", 
    "collection.whitelist": "myDatabase[.]*", 
  }
}

然后 Kafka Connect 将使用名称填充两个主题:

  • myConnector.myDatabase.users
  • myConnector.myDatabase.orders

现在,如果您仍想更改目标主题的名称,可以使用 Kafka Connect Single Message Transforms (SMT)。更准确地说,ExtractTopic 应该可以帮助您。请注意,尽管此 SMT 可帮助您从消息的键或值中提取主题名称,因此您需要以某种方式在有效负载中包含所需的主题名称。

例如,下面的 SMT 将提取字段 myField 的值并将其用作记录的主题:

 transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
 transforms.ValueFieldExample.field=myField

我在使用 JDBC 源连接器时遇到了同样的问题,但找到了不同的解决方案:

通过 dropPrefix 使用 RegexRouter 单消息转换,您可以覆盖整个主题名称:

"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"A.B.C",                 // whole created topic name
"transforms.dropPrefix.replacement":"existing-topic"   // whole exisiting topic name

它适用于正则表达式,因此如果您使用多个 tables/collections 并且您创建的主题名称不是固定的,您应该能够使其动态化。

这有点老套,因为从技术上讲,我删除了整个主题名称,然后添加了一个新主题名称 - 无论如何,这对我来说不是最好的解决方案。