Debezium - 自定义有效负载 - MySQL 连接器
Debezium - Custom Payload - MySQL Connector
我正在使用 Debezium 将数据从 MySQL 同步到 S3。现在我想做一些改变。
样本插入:
create table new (id int);
insert into new (1);
1。自定义负载
{
"schema": {
"type": "struct",
bla bla bla
"optional": false,
"name": "_72.31.84.129.test.new.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 10
},
"source": {
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
},
"op": "c",
"ts_ms": 1576605998231
}
}
我只想推送带有一些自定义更改的负载选项。我需要在 payload.after
.
中包含 source,op,ts_ms
预期输出:
{
"id": 10,
"source": {
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
},
"op": "c",
"ts_ms": 1576605998231
}
我不想要架构,payload.before。我不确定如何获得此输出。
查看 SMT 以提取新记录状态。它只会传播 after
中的内容。或者,您也可以让它添加从 source
中选择的字段。
...
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.source.fields=table,lsn
...
您不能插入 op
和 ts_ms
字段 atm.,但它们可以作为消息传播 headers。
我正在使用 Debezium 将数据从 MySQL 同步到 S3。现在我想做一些改变。
样本插入:
create table new (id int);
insert into new (1);
1。自定义负载
{
"schema": {
"type": "struct",
bla bla bla
"optional": false,
"name": "_72.31.84.129.test.new.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 10
},
"source": {
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
},
"op": "c",
"ts_ms": 1576605998231
}
}
我只想推送带有一些自定义更改的负载选项。我需要在 payload.after
.
source,op,ts_ms
预期输出:
{
"id": 10,
"source": {
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
},
"op": "c",
"ts_ms": 1576605998231
}
我不想要架构,payload.before。我不确定如何获得此输出。
查看 SMT 以提取新记录状态。它只会传播 after
中的内容。或者,您也可以让它添加从 source
中选择的字段。
...
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.source.fields=table,lsn
...
您不能插入 op
和 ts_ms
字段 atm.,但它们可以作为消息传播 headers。