具有动态值的 Kafka Connect InsertField 转换

Kafka Connect InsertField transform with dynamic values

我 运行 遇到需要向 Kafka Connect 记录插入新字段的情况,但 InsertField 转换似乎仅限于静态值。

https://docs.confluent.io/current/connect/transforms/insertfield.html

有没有办法根据记录中的其他字段添加动态值?

我需要这个的原因是因为我正在使用 JDBC 源和接收器连接器在两个数据库之间传输数据。在水槽一侧,我正在做 upsert。因此,我需要以下字段:

        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "TABLE_ID",

这很好用,但它使我不得不为每个 table 拥有一个连接器文件,因为在我的源数据库中,所有 table 都具有 tableName_ID 形式的主键。所以 pk.fields 总是会有所不同。

我想在我的接收器数据库中我可以添加一个我的应用程序不知道的新字段,但 Kafka Connect 会使用它来更新主键。它会被称为 kafka_id 之类的东西,并且每个 table 都是一样的。我想在我的源配置中添加这个字段,然后在我的接收器中更新 pk.fields。有什么办法可以做到这一点?我需要编写自定义转换吗?谢谢!

在您的情况下,最好的选择是您自己的 Kafka Connect SMT:您可以使用来自 ConnectRecord 的主题信息。

Example of SMT from How to Use Single Message Transforms in Kafka Connect 条。