是否可以将 debezium 生成的 kafka 消息下沉到雪花

Is it possible to sink kafka message generated by debezium to snowflake

我使用debezium-ui repo测试debezium mysql cdc功能,消息可以正常流
进入kafka,创建mysql连接的请求体如下:

    {
      "name": "inventory-connector",  
      "config": {  
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",  
          "database.hostname": "dbzui-db-mysql",  
          "database.port": "3306",
          "database.user": "mysqluser",
          "database.password": "mysql",
          "database.server.id": "184054",  
          "database.server.name": "inventory-connector-mysql",  
          "database.include.list": "inventory",  
          "database.history.kafka.bootstrap.servers": "dbzui-kafka:9092",  
          "database.history.kafka.topic": "dbhistory.inventory"  
      }
    }

然后我需要将 kafka 消息放入我的团队使用的数据仓库 snowflake 中。我创建了一个snowflake sink connector来sink它,请求体如下:

{
    "name": "kafka2-04",
    "config": {
        "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max": 1,
        "topics": "inventory-connector-mysql.inventory.orders",
        "snowflake.topic2table.map": "inventory-connector-mysql.inventory.orders:tbl_orders",
        "snowflake.url.name": "**.snowflakecomputing.com",
        "snowflake.user.name": "kafka_connector_user_1",
        "snowflake.private.key": "*******",
        "snowflake.private.key.passphrase": "",
        "snowflake.database.name": "kafka_db",
        "snowflake.schema.name": "kafka_schema",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
        "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
        "value.converter.schemas.enable":"true"
    }
}

但是在它运行之后,数据汇入我的雪花是这样的:data in snowflake,雪花table中的模式不同于mysqltable。是我的接收器连接器配置不正确,还是无法使用 SnowflakeSinkConnector 接收 debezium 生成的 kafka 数据。

这是 Snowflake 中的默认行为,记录在案 here:

Kafka 连接器加载的每个 Snowflake table 都有一个由两个 VARIANT 列组成的模式:

RECORD_CONTENT. This contains the Kafka message.

RECORD_METADATA. This contains metadata about the message, for example, the topic from which the message was read.

如果 Snowflake 创建 table,则 table 仅包含这两列。如果用户为 Kafka 连接器创建 table 添加行,那么 table 可以包含不止这两列(任何额外的列必须允许 NULL 值,因为来自连接器的数据不包含值对于那些列)。