Kafka Connect 将 JSON 字符串转换为实际的 JSON

Kafka Connect transforming JSON string to actual JSON

我正在尝试弄清楚是否可以使用 Kafka Connect 将存储为字符串的 JSON 值转换为实际的 JSON 结构。

我试图寻找这样的转换,但找不到。例如,这可能是来源:

{
  "UserID":2105058535,
  "DocumentID":2105058535,
  "RandomJSON":"{\"Tags\":[{\"TagID\":1,\"TagName\":\"Java\"},{\"TagID\":2,\"TagName\":\"Kafka\"}]}"
}

这是我的目标:

{
  "UserID":2105058535,
  "DocumentID":2105058535,
  "RandomJSON":{
    "Tags":[
      {
        "TagID":1,
        "TagName":"Java"
      },
      {
        "TagID":2,
        "TagName":"Kafka"
      }
    ]
  }
}

我正在尝试对 Elasticsearch 接收器连接器进行这些转换,如果它有所作为的话。

我知道我可以将 Logstash 与 JSON 过滤器一起使用来做到这一点,但我想知道是否有一种方法可以仅使用 Kafka Connect 来做到这一点。

听起来像 Single Message Transform (thus applicable to any connector, not just ES), but there aren't any out of the box doing what you describe. The API is documented here

我有一个类似的问题,但相反。我在 Json 中有数据,我需要将其中一些数据转换为 Json 字符串表示形式,以便使用 Cassandra Sink 将其存储在 Cassandra 中。我最终 创建了一个 Kafka 流应用程序 从主题中读取,然后将 Json 对象输出到另一个主题 连接器。

topic 文档 <- 通过调用 mapValues 由您的 kafka 流读取或创建一个 Jackson POJO 以根据需要进行序列化,然后将值写入 -> topic document.elasticsearch

您可以使用 FromJson 转换器。 请查看此 link 了解更多详情 https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema/transformations/examples/FromJson.inline.html