是否可以为一个数据库使用 debezium 和 kafka 建立一个 Elasticsearch 索引?

Is it possible to have one Elasticsearch Index for one database with tables using debezium and kafka?

我有这个连接器和接收器,它基本上创建了一个主题 “Test.dbo.TEST_A”并写入 ES 索引“Test”。我已经设置了 "key.ignore": "false" 这样行更新也会在 ES 和 "transforms.unwrap.add.fields":"table" 跟踪文档属于哪个table。

{
    "name": "Test-connector", 
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "tasks.max": "1",
        "database.hostname": "192.168.1.234", 
        "database.port": "1433", 
        "database.user": "user", 
        "database.password": "pass", 
        "database.dbname": "Test", 
        "database.server.name": "MyServer",
        "table.include.list": "dbo.TEST_A",
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.testA",

        "transforms": "unwrap",

        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields":"table"
    }
}
{
    "name": "elastic-sink-test",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "TEST_A",
        "connection.url": "http://localhost:9200/",
        "string.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schema.enable": "false",
        "schema.ignore": "true",

        "transforms": "topicRoute,unwrap,key",

        "transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.topicRoute.regex": "(.*).dbo.TEST_A",                          /* Use the database name */
        "transforms.topicRoute.replacement": "",

        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",    
        "transforms.unwrap.drop.tombstones": "false",    

        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "Id",       

        "key.ignore": "false",                                                        
        "type.name": "TEST_A",
        "behavior.on.null.values": "delete"                                                     
    }
}

但是当我添加另一个 connector/sink 以包含数据库中的另一个 table“TEST_B”时。 似乎每当来自 TEST_A 和 TEST_B 的 id 相同时,从 ES?

中删除了行中的一个

此设置是否可能有一个索引 = 一个数据库,或者是每个 table 有一个索引的唯一解决方案? 我想要一个索引=一个数据库的原因是当更多的数据库被添加到 ES 时减少索引的数量。

您正在读取来自不同 Databases/Tables 的数据更改并将它们写入同一个 ElasticSearch 索引,并将 ES 文档 ID 设置为 DB 记录 ID。如您所见,如果数据库记录 ID 发生冲突,索引文档 ID 也会发生冲突,导致旧文档被删除。

这里有几个选项:

  • 每个 DB/Table 名称的 ElasticSearch 索引:您可以使用不同的连接器或自定义单消息转换 (SMT)
  • 来实现它
  • 全局唯一的数据库记录:如果您控制源表的架构,则可以将主键设置为 UUID。这将防止 ID 冲突。
  • 正如您在评论中提到的,将 ES 文档 ID 设置为 DB/Table/ID。您可以使用 SMT
  • 实施此更改