如何转换和提取 Kafka sink JDBC 连接器中的字段

How to transform and extract fields in Kafka sink JDBC connector

我正在使用第 3 方 CDC 工具将数据从源数据库复制到 Kafka 主题中。示例行如下所示:

{  
   "data":{  
      "USER_ID":{  
         "string":"1"
      },
      "USER_CATEGORY":{  
         "string":"A"
      }
   },
   "beforeData":{  
      "Data":{  
         "USER_ID":{  
            "string":"1"
         },
         "USER_CATEGORY":{  
            "string":"B"
         }
      }
   },
   "headers":{  
      "operation":"UPDATE",
      "timestamp":"2018-05-03T13:53:43.000"
   }
}

接收器文件中需要什么配置才能提取 dataheaders 下的所有(子)字段并忽略 beforeData 下的那些,以便目标 table其中Kafka Sink将传输的数据将包含以下字段:

USER_ID, USER_CATEGORY, operation, timestamp

我浏览了 transformation list in confluent's docs 但我无法找到如何使用它们来实现上述目标。

我想你想要 ExtractField,不幸的是,它是一个 Map.get 操作,所以这意味着 1) 嵌套字段不能一次性获得 2) 多个字段需要多次转换。

话虽如此,您可以尝试这个(未经测试)

transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers

如果这不起作用,您最好实现自己的 Transformations 包,该包至少可以从 Struct / Map 中删除值。

如果您愿意列出特定的字段名称,可以通过以下方式解决:

  1. 使用 Flatten 变换折叠嵌套(这会将原始结构的路径转换为 ​​dot-delimited 个名称)
  2. 使用 rename 的替换转换使字段名称成为您希望接收器发出的名称
  3. 使用另一个替换为 whitelist 的转换将发出的字段限制为您 select

对于你的情况,它可能看起来像:

  "transforms": "t1,t2,t3",
  "transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
  "transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",