如何转换和提取 Kafka sink JDBC 连接器中的字段
How to transform and extract fields in Kafka sink JDBC connector
我正在使用第 3 方 CDC 工具将数据从源数据库复制到 Kafka 主题中。示例行如下所示:
{
"data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"A"
}
},
"beforeData":{
"Data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"B"
}
}
},
"headers":{
"operation":"UPDATE",
"timestamp":"2018-05-03T13:53:43.000"
}
}
接收器文件中需要什么配置才能提取 data
和 headers
下的所有(子)字段并忽略 beforeData
下的那些,以便目标 table其中Kafka Sink将传输的数据将包含以下字段:
USER_ID, USER_CATEGORY, operation, timestamp
我浏览了 transformation list in confluent's docs 但我无法找到如何使用它们来实现上述目标。
我想你想要 ExtractField
,不幸的是,它是一个 Map.get
操作,所以这意味着 1) 嵌套字段不能一次性获得 2) 多个字段需要多次转换。
话虽如此,您可以尝试这个(未经测试)
transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers
如果这不起作用,您最好实现自己的 Transformations 包,该包至少可以从 Struct / Map 中删除值。
如果您愿意列出特定的字段名称,可以通过以下方式解决:
- 使用 Flatten 变换折叠嵌套(这会将原始结构的路径转换为 dot-delimited 个名称)
- 使用
rename
的替换转换使字段名称成为您希望接收器发出的名称
- 使用另一个替换为
whitelist
的转换将发出的字段限制为您 select
对于你的情况,它可能看起来像:
"transforms": "t1,t2,t3",
"transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
"transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",
我正在使用第 3 方 CDC 工具将数据从源数据库复制到 Kafka 主题中。示例行如下所示:
{
"data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"A"
}
},
"beforeData":{
"Data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"B"
}
}
},
"headers":{
"operation":"UPDATE",
"timestamp":"2018-05-03T13:53:43.000"
}
}
接收器文件中需要什么配置才能提取 data
和 headers
下的所有(子)字段并忽略 beforeData
下的那些,以便目标 table其中Kafka Sink将传输的数据将包含以下字段:
USER_ID, USER_CATEGORY, operation, timestamp
我浏览了 transformation list in confluent's docs 但我无法找到如何使用它们来实现上述目标。
我想你想要 ExtractField
,不幸的是,它是一个 Map.get
操作,所以这意味着 1) 嵌套字段不能一次性获得 2) 多个字段需要多次转换。
话虽如此,您可以尝试这个(未经测试)
transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers
如果这不起作用,您最好实现自己的 Transformations 包,该包至少可以从 Struct / Map 中删除值。
如果您愿意列出特定的字段名称,可以通过以下方式解决:
- 使用 Flatten 变换折叠嵌套(这会将原始结构的路径转换为 dot-delimited 个名称)
- 使用
rename
的替换转换使字段名称成为您希望接收器发出的名称 - 使用另一个替换为
whitelist
的转换将发出的字段限制为您 select
对于你的情况,它可能看起来像:
"transforms": "t1,t2,t3",
"transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
"transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",