是否可以重命名 debezium mysql 连接器的消息触发 运行 上的字段?

Is it possible to rename the fields on message triggered running of debezium mysql connector?

我已经配置了一个 debezium mysql 连接器,我需要在有效负载中包含附加字段作为 table 名称。我需要做哪些配置更改才能实现此目标?

table 名称已包含在 source.table 元素中。 Here's 插入到名为 rental 的 table 的示例消息:

{
  "before": null,
  "after": {
    "fullfillment.sakila.rental.Value": {
      "rental_id": 13346,
      "rental_date": 1124483301000,
      "inventory_id": 4541,
      "customer_id": 131,
      "return_date": {
        "long": 1125188901000
      },
      "staff_id": 2,
      "last_update": "2006-02-15T21:30:53Z"
    }
  },
  "source": {
    "name": "fullfillment",
    "server_id": 0,
    "ts_sec": 0,
    "gtid": null,
    "file": "mysql-bin.000002",
    "pos": 832,
    "row": 0,
    "snapshot": {
      "boolean": true
    },
    "thread": null,
    "db": {
      "string": "sakila"
    },
    "table": {
      "string": "rental"
    }
  },
  "op": "c",
  "ts_ms": {
    "long": 1518190060267
  }
}

如果您想插入额外的字段,您可以使用 InsertField$Value Single Message Transform, which you can see an example of in this article


编辑:

如果您希望该字段位于消息的不同部分,您有几种选择。您可以 post 使用 Kafka Streams 处理数据以根据需要重组数据。您可以使用可用的单一消息转换来展平 after 组件,然后添加静态值:

            "transforms": "unwrap,InsertTopic,InsertSourceDetails",
            "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
            "transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertTopic.topic.field":"messagetopic",
            "transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertSourceDetails.static.field":"messagesource",
            "transforms.InsertSourceDetails.static.value":"Debezium CDC from Oracle on asgard"

或者您可以编写自己的单一消息转换来准确地进行您想要进行的修改。