使用debezium将数据从MongoDB传输到kafka时如何进行类型转换?

How to do type conversion when transferring data from MongoDB to kafka with debezium?

在Mongodb中,objectid是base64。我正在使用 Debezium 将这些文档流式传输到 Kafka。如何让 ObjectId 在 kafka 中写成 UUID?

Mongo 示例文档:

{
    "_id" : BinData(3,"8D/JiwMtkEKSfrfKsxUe+g=="),
    "Version" : 5,
    "CreatedAt" : ISODate("2021-09-22T00:24:43.939+03:00"),
    "UpdatedAt" : ISODate("2021-09-22T00:32:53.096+03:00"),
    "AbidikId" : BinData(3,"CVebG2sIf0OtxnUNZIl39g=="),
    "GubidikId" : BinData(3,"U06d2Rk4nUG7Fz3iASM9LQ=="),
    "IsActive" : true,
    "BrandList" : [ "Sony2", "SUNY2" ],
    "CategoryIdList" : [ ]
}

Kafka 示例消息:

{
    "_id": "8D/JiwMtkEKSfrfKsxUe+g==",
    "Version": 5,
    "CreatedAt": 1632259483939,
    "UpdatedAt": 1632259973096,
    "AbidikId": "CVebG2sIf0OtxnUNZIl39g==",
    "GubidikId": "U06d2Rk4nUG7Fz3iASM9LQ==",
    "IsActive": true,
    "BrandList": [
       "Sony2",
       "SUNY2"
    ],
    "CategoryIdList": []
}

我期望 kafka 消息是什么?

"_id" : "8bc93ff0-2d03-4290-927e-b7cab3151efa",
"AbidikId": "1b9b5709-086b-437f-adc6-750d648977f6",
"GubidikId": "d99d4e53-3819-419d-bb17-3de201233d2d"

我在 Debezium 中找不到解决方案。我无法更改 MongoDB 中的 ID。提前谢谢你。

我们通过自定义smt解决了这个问题。

  1. 首先我们从这个 repo 创建了 java 项目:https://github.com/confluentinc/kafka-connect-insert-uuid

  2. 在此项目中自定义您的数据,例如转换或编辑。我们也可以从 debezium 配置中获取我们想要的字段。

  3. 通过 Maven 导出 jar。

  4. 从那个罐子里做 docker 图片。

  5. 在 debezium 配置中设置图像和字段:

    "transforms": "unwrap,Reroute,convertguid,insertKey", "transforms.convertguid.type":"com.example.kafka.connect.smt.Base64ToCsuuid$Value",

    "transforms.convertguid.csuuid.field.names":"_id,examplefield1,examplefield2",

  6. 重新创建 debezium 连接器。

最后我们可以在流式传输到 kafka 时将 mongo id 转换为 UUID。希望对有需要的人有所帮助。