是否可以为一个数据库使用 debezium 和 kafka 建立一个 Elasticsearch 索引?
Is it possible to have one Elasticsearch Index for one database with tables using debezium and kafka?
我有这个连接器和接收器,它基本上创建了一个主题
“Test.dbo.TEST_A”并写入 ES 索引“Test”。我已经设置了 "key.ignore": "false" 这样行更新也会在 ES 和
"transforms.unwrap.add.fields":"table" 跟踪文档属于哪个table。
{
"name": "Test-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "192.168.1.234",
"database.port": "1433",
"database.user": "user",
"database.password": "pass",
"database.dbname": "Test",
"database.server.name": "MyServer",
"table.include.list": "dbo.TEST_A",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testA",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields":"table"
}
}
{
"name": "elastic-sink-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "TEST_A",
"connection.url": "http://localhost:9200/",
"string.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.enable": "false",
"schema.ignore": "true",
"transforms": "topicRoute,unwrap,key",
"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "(.*).dbo.TEST_A", /* Use the database name */
"transforms.topicRoute.replacement": "",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "Id",
"key.ignore": "false",
"type.name": "TEST_A",
"behavior.on.null.values": "delete"
}
}
但是当我添加另一个 connector/sink 以包含数据库中的另一个 table“TEST_B”时。
似乎每当来自 TEST_A 和 TEST_B 的 id 相同时,从 ES?
中删除了行中的一个
此设置是否可能有一个索引 = 一个数据库,或者是每个 table 有一个索引的唯一解决方案?
我想要一个索引=一个数据库的原因是当更多的数据库被添加到 ES 时减少索引的数量。
您正在读取来自不同 Databases/Tables 的数据更改并将它们写入同一个 ElasticSearch 索引,并将 ES 文档 ID 设置为 DB 记录 ID。如您所见,如果数据库记录 ID 发生冲突,索引文档 ID 也会发生冲突,导致旧文档被删除。
这里有几个选项:
- 每个 DB/Table 名称的 ElasticSearch 索引:您可以使用不同的连接器或自定义单消息转换 (SMT)
来实现它
- 全局唯一的数据库记录:如果您控制源表的架构,则可以将主键设置为 UUID。这将防止 ID 冲突。
- 正如您在评论中提到的,将 ES 文档 ID 设置为 DB/Table/ID。您可以使用 SMT
实施此更改
我有这个连接器和接收器,它基本上创建了一个主题 “Test.dbo.TEST_A”并写入 ES 索引“Test”。我已经设置了 "key.ignore": "false" 这样行更新也会在 ES 和 "transforms.unwrap.add.fields":"table" 跟踪文档属于哪个table。
{
"name": "Test-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "192.168.1.234",
"database.port": "1433",
"database.user": "user",
"database.password": "pass",
"database.dbname": "Test",
"database.server.name": "MyServer",
"table.include.list": "dbo.TEST_A",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testA",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields":"table"
}
}
{
"name": "elastic-sink-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "TEST_A",
"connection.url": "http://localhost:9200/",
"string.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.enable": "false",
"schema.ignore": "true",
"transforms": "topicRoute,unwrap,key",
"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "(.*).dbo.TEST_A", /* Use the database name */
"transforms.topicRoute.replacement": "",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "Id",
"key.ignore": "false",
"type.name": "TEST_A",
"behavior.on.null.values": "delete"
}
}
但是当我添加另一个 connector/sink 以包含数据库中的另一个 table“TEST_B”时。 似乎每当来自 TEST_A 和 TEST_B 的 id 相同时,从 ES?
中删除了行中的一个此设置是否可能有一个索引 = 一个数据库,或者是每个 table 有一个索引的唯一解决方案? 我想要一个索引=一个数据库的原因是当更多的数据库被添加到 ES 时减少索引的数量。
您正在读取来自不同 Databases/Tables 的数据更改并将它们写入同一个 ElasticSearch 索引,并将 ES 文档 ID 设置为 DB 记录 ID。如您所见,如果数据库记录 ID 发生冲突,索引文档 ID 也会发生冲突,导致旧文档被删除。
这里有几个选项:
- 每个 DB/Table 名称的 ElasticSearch 索引:您可以使用不同的连接器或自定义单消息转换 (SMT) 来实现它
- 全局唯一的数据库记录:如果您控制源表的架构,则可以将主键设置为 UUID。这将防止 ID 冲突。
- 正如您在评论中提到的,将 ES 文档 ID 设置为 DB/Table/ID。您可以使用 SMT 实施此更改