Debezium Postgres 和 ElasticSearch - 在 ElasticSearch 中存储复杂对象
Debezium Postgres and ElasticSearch - Store complex Object in ElasticSearch
我在 Postgres 中有一个数据库,其中有一个 table“产品”,它与“sales_Channel”一一对应。所以 1 Product 可以有多个 SalesChannel。现在我想把它转移到 ES 并保持最新,所以我正在使用 debezium 和 kafka。把单个的table传给ES是没有问题的。我可以查询 SalesChannels 和 Products。但我需要将所有 SalesChannels 作为结果附加的产品。我如何让 debezium 转移这个?
产品映射
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"_doc": {
"properties": {
"id": {
"type": "integer"
}
}
}
}
}
产品接收器
{
"name": "es-sink-product",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "product",
"connection.url": "http://elasticsearch:9200",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.drop.deletes": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"type.name": "_doc",
"behavior.on.null.values": "delete"
}
}
我在 Postgres 中有一个数据库,其中有一个 table“产品”,它与“sales_Channel”一一对应。所以 1 Product 可以有多个 SalesChannel。现在我想把它转移到 ES 并保持最新,所以我正在使用 debezium 和 kafka。把单个的table传给ES是没有问题的。我可以查询 SalesChannels 和 Products。但我需要将所有 SalesChannels 作为结果附加的产品。我如何让 debezium 转移这个?
产品映射
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"_doc": {
"properties": {
"id": {
"type": "integer"
}
}
}
}
}
产品接收器
{
"name": "es-sink-product",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "product",
"connection.url": "http://elasticsearch:9200",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.drop.deletes": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"type.name": "_doc",
"behavior.on.null.values": "delete"
}
}