使用debezium将数据从MongoDB传输到kafka时如何进行类型转换?
How to do type conversion when transferring data from MongoDB to kafka with debezium?
在Mongodb中,objectid是base64。我正在使用 Debezium 将这些文档流式传输到 Kafka。如何让 ObjectId 在 kafka 中写成 UUID?
Mongo 示例文档:
{
"_id" : BinData(3,"8D/JiwMtkEKSfrfKsxUe+g=="),
"Version" : 5,
"CreatedAt" : ISODate("2021-09-22T00:24:43.939+03:00"),
"UpdatedAt" : ISODate("2021-09-22T00:32:53.096+03:00"),
"AbidikId" : BinData(3,"CVebG2sIf0OtxnUNZIl39g=="),
"GubidikId" : BinData(3,"U06d2Rk4nUG7Fz3iASM9LQ=="),
"IsActive" : true,
"BrandList" : [ "Sony2", "SUNY2" ],
"CategoryIdList" : [ ]
}
Kafka 示例消息:
{
"_id": "8D/JiwMtkEKSfrfKsxUe+g==",
"Version": 5,
"CreatedAt": 1632259483939,
"UpdatedAt": 1632259973096,
"AbidikId": "CVebG2sIf0OtxnUNZIl39g==",
"GubidikId": "U06d2Rk4nUG7Fz3iASM9LQ==",
"IsActive": true,
"BrandList": [
"Sony2",
"SUNY2"
],
"CategoryIdList": []
}
我期望 kafka 消息是什么?
"_id" : "8bc93ff0-2d03-4290-927e-b7cab3151efa",
"AbidikId": "1b9b5709-086b-437f-adc6-750d648977f6",
"GubidikId": "d99d4e53-3819-419d-bb17-3de201233d2d"
我在 Debezium 中找不到解决方案。我无法更改 MongoDB 中的 ID。提前谢谢你。
我们通过自定义smt解决了这个问题。
首先我们从这个 repo 创建了 java 项目:https://github.com/confluentinc/kafka-connect-insert-uuid
在此项目中自定义您的数据,例如转换或编辑。我们也可以从 debezium 配置中获取我们想要的字段。
通过 Maven 导出 jar。
从那个罐子里做 docker 图片。
在 debezium 配置中设置图像和字段:
"transforms": "unwrap,Reroute,convertguid,insertKey",
"transforms.convertguid.type":"com.example.kafka.connect.smt.Base64ToCsuuid$Value",
"transforms.convertguid.csuuid.field.names":"_id,examplefield1,examplefield2",
重新创建 debezium 连接器。
最后我们可以在流式传输到 kafka 时将 mongo id 转换为 UUID。希望对有需要的人有所帮助。
在Mongodb中,objectid是base64。我正在使用 Debezium 将这些文档流式传输到 Kafka。如何让 ObjectId 在 kafka 中写成 UUID?
Mongo 示例文档:
{
"_id" : BinData(3,"8D/JiwMtkEKSfrfKsxUe+g=="),
"Version" : 5,
"CreatedAt" : ISODate("2021-09-22T00:24:43.939+03:00"),
"UpdatedAt" : ISODate("2021-09-22T00:32:53.096+03:00"),
"AbidikId" : BinData(3,"CVebG2sIf0OtxnUNZIl39g=="),
"GubidikId" : BinData(3,"U06d2Rk4nUG7Fz3iASM9LQ=="),
"IsActive" : true,
"BrandList" : [ "Sony2", "SUNY2" ],
"CategoryIdList" : [ ]
}
Kafka 示例消息:
{
"_id": "8D/JiwMtkEKSfrfKsxUe+g==",
"Version": 5,
"CreatedAt": 1632259483939,
"UpdatedAt": 1632259973096,
"AbidikId": "CVebG2sIf0OtxnUNZIl39g==",
"GubidikId": "U06d2Rk4nUG7Fz3iASM9LQ==",
"IsActive": true,
"BrandList": [
"Sony2",
"SUNY2"
],
"CategoryIdList": []
}
我期望 kafka 消息是什么?
"_id" : "8bc93ff0-2d03-4290-927e-b7cab3151efa",
"AbidikId": "1b9b5709-086b-437f-adc6-750d648977f6",
"GubidikId": "d99d4e53-3819-419d-bb17-3de201233d2d"
我在 Debezium 中找不到解决方案。我无法更改 MongoDB 中的 ID。提前谢谢你。
我们通过自定义smt解决了这个问题。
首先我们从这个 repo 创建了 java 项目:https://github.com/confluentinc/kafka-connect-insert-uuid
在此项目中自定义您的数据,例如转换或编辑。我们也可以从 debezium 配置中获取我们想要的字段。
通过 Maven 导出 jar。
从那个罐子里做 docker 图片。
在 debezium 配置中设置图像和字段:
"transforms": "unwrap,Reroute,convertguid,insertKey", "transforms.convertguid.type":"com.example.kafka.connect.smt.Base64ToCsuuid$Value",
"transforms.convertguid.csuuid.field.names":"_id,examplefield1,examplefield2",
重新创建 debezium 连接器。
最后我们可以在流式传输到 kafka 时将 mongo id 转换为 UUID。希望对有需要的人有所帮助。