是否可以将 debezium 生成的 kafka 消息下沉到雪花
Is it possible to sink kafka message generated by debezium to snowflake
我使用debezium-ui repo测试debezium mysql cdc功能,消息可以正常流
进入kafka,创建mysql连接的请求体如下:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "dbzui-db-mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysql",
"database.server.id": "184054",
"database.server.name": "inventory-connector-mysql",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "dbzui-kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}
然后我需要将 kafka 消息放入我的团队使用的数据仓库 snowflake 中。我创建了一个snowflake sink connector来sink它,请求体如下:
{
"name": "kafka2-04",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": 1,
"topics": "inventory-connector-mysql.inventory.orders",
"snowflake.topic2table.map": "inventory-connector-mysql.inventory.orders:tbl_orders",
"snowflake.url.name": "**.snowflakecomputing.com",
"snowflake.user.name": "kafka_connector_user_1",
"snowflake.private.key": "*******",
"snowflake.private.key.passphrase": "",
"snowflake.database.name": "kafka_db",
"snowflake.schema.name": "kafka_schema",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
"value.converter.schemas.enable":"true"
}
}
但是在它运行之后,数据汇入我的雪花是这样的:data in snowflake,雪花table中的模式不同于mysqltable。是我的接收器连接器配置不正确,还是无法使用 SnowflakeSinkConnector 接收 debezium 生成的 kafka 数据。
这是 Snowflake 中的默认行为,记录在案 here:
Kafka 连接器加载的每个 Snowflake table 都有一个由两个 VARIANT 列组成的模式:
RECORD_CONTENT. This contains the Kafka message.
RECORD_METADATA. This contains metadata about the message, for example, the topic from which the message was read.
如果 Snowflake 创建 table,则 table 仅包含这两列。如果用户为 Kafka 连接器创建 table 添加行,那么 table 可以包含不止这两列(任何额外的列必须允许 NULL 值,因为来自连接器的数据不包含值对于那些列)。
我使用debezium-ui repo测试debezium mysql cdc功能,消息可以正常流
进入kafka,创建mysql连接的请求体如下:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "dbzui-db-mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysql",
"database.server.id": "184054",
"database.server.name": "inventory-connector-mysql",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "dbzui-kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}
然后我需要将 kafka 消息放入我的团队使用的数据仓库 snowflake 中。我创建了一个snowflake sink connector来sink它,请求体如下:
{
"name": "kafka2-04",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": 1,
"topics": "inventory-connector-mysql.inventory.orders",
"snowflake.topic2table.map": "inventory-connector-mysql.inventory.orders:tbl_orders",
"snowflake.url.name": "**.snowflakecomputing.com",
"snowflake.user.name": "kafka_connector_user_1",
"snowflake.private.key": "*******",
"snowflake.private.key.passphrase": "",
"snowflake.database.name": "kafka_db",
"snowflake.schema.name": "kafka_schema",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
"value.converter.schemas.enable":"true"
}
}
但是在它运行之后,数据汇入我的雪花是这样的:data in snowflake,雪花table中的模式不同于mysqltable。是我的接收器连接器配置不正确,还是无法使用 SnowflakeSinkConnector 接收 debezium 生成的 kafka 数据。
这是 Snowflake 中的默认行为,记录在案 here:
Kafka 连接器加载的每个 Snowflake table 都有一个由两个 VARIANT 列组成的模式:
RECORD_CONTENT. This contains the Kafka message.
RECORD_METADATA. This contains metadata about the message, for example, the topic from which the message was read.
如果 Snowflake 创建 table,则 table 仅包含这两列。如果用户为 Kafka 连接器创建 table 添加行,那么 table 可以包含不止这两列(任何额外的列必须允许 NULL 值,因为来自连接器的数据不包含值对于那些列)。