通过 Debezium 复制 MySQL 到 PostgreSQL 时数据类型信息丢失
Data type information lost when replication MySQL to PostgreSQL via Debezium
我需要将 MySQL 数据库复制到 PostgreSQL 数据库。我选择了:
- Debezium 连接
- Avro 格式
- 汇合模式注册表
- 卡夫卡
正在复制数据,但是,我丢失了一些模式信息。例如,mysql 中具有 datetime
格式的列在 Postgres 中被复制为 bigint
,不创建外键,也不保留列的顺序(这很好)等..
PostgreSQL 接收器连接器:
{
"name": "jdbc-sink-dbt",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"topics.regex": "test_(.*)",
"connection.url": "jdbc:postgresql://dbt-postgres:5432/test?user=postgres&password=postgres",
"transforms": "unwrap,removePrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.removePrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.removePrefix.regex": "test_([^.]+)",
"transforms.removePrefix.replacement": "",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
MySQL 连接器:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.17.0.1",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.allowPublicKeyRetrieval": "true",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "test",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.test",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\.([^.]+)\.([^.]+)",
"transforms.route.replacement": "_",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Debezium 连接配置:
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
For example, a column with datetime format in mysql is replicated as bigint
这是由于 Debezium 连接器在源端使用的默认值 time.precision.mode
。如果您查看 documentation,您会注意到默认精度将 datetime
列作为 INT64 发出,这解释了为什么接收器连接器将内容写入 bigint
.
您现在可以在源端将 time.precision.mode
设置为 connect
,以便 JDBC 接收器连接器可以正确解释这些值。
foreign keys are not created
这是意料之中的,请参阅此 Confluent GitHub Issue。目前,JDBC 接收器不具备支持在 JDBC 级别实现外键关系的能力。
order of columns is not preserved
这也在意料之中。没有预期的保证 Debezium 应该以与数据库中完全相同的顺序存储关系列(尽管我们这样做)并且 JDBC 接收器连接器不能保证保留字段的顺序,因为它们' 从发出的事件中读取。如果接收器连接器使用像 HashMap 这样的容器来存储列名,那么顺序可能与源数据库有很大不同。
如果需要在镜像源系统的目标系统中保留更高级别的关系元数据(例如外键和列顺序),您可能需要查看单独的工具链来复制初始数据通过某种类型的模式转储、转换和导入到目标数据库来模式和关系,然后依赖 CDC 管道进行数据复制。
我需要将 MySQL 数据库复制到 PostgreSQL 数据库。我选择了:
- Debezium 连接
- Avro 格式
- 汇合模式注册表
- 卡夫卡
正在复制数据,但是,我丢失了一些模式信息。例如,mysql 中具有 datetime
格式的列在 Postgres 中被复制为 bigint
,不创建外键,也不保留列的顺序(这很好)等..
PostgreSQL 接收器连接器:
{
"name": "jdbc-sink-dbt",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"topics.regex": "test_(.*)",
"connection.url": "jdbc:postgresql://dbt-postgres:5432/test?user=postgres&password=postgres",
"transforms": "unwrap,removePrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.removePrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.removePrefix.regex": "test_([^.]+)",
"transforms.removePrefix.replacement": "",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
MySQL 连接器:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.17.0.1",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.allowPublicKeyRetrieval": "true",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "test",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.test",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\.([^.]+)\.([^.]+)",
"transforms.route.replacement": "_",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Debezium 连接配置:
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
For example, a column with datetime format in mysql is replicated as bigint
这是由于 Debezium 连接器在源端使用的默认值 time.precision.mode
。如果您查看 documentation,您会注意到默认精度将 datetime
列作为 INT64 发出,这解释了为什么接收器连接器将内容写入 bigint
.
您现在可以在源端将 time.precision.mode
设置为 connect
,以便 JDBC 接收器连接器可以正确解释这些值。
foreign keys are not created
这是意料之中的,请参阅此 Confluent GitHub Issue。目前,JDBC 接收器不具备支持在 JDBC 级别实现外键关系的能力。
order of columns is not preserved
这也在意料之中。没有预期的保证 Debezium 应该以与数据库中完全相同的顺序存储关系列(尽管我们这样做)并且 JDBC 接收器连接器不能保证保留字段的顺序,因为它们' 从发出的事件中读取。如果接收器连接器使用像 HashMap 这样的容器来存储列名,那么顺序可能与源数据库有很大不同。
如果需要在镜像源系统的目标系统中保留更高级别的关系元数据(例如外键和列顺序),您可能需要查看单独的工具链来复制初始数据通过某种类型的模式转储、转换和导入到目标数据库来模式和关系,然后依赖 CDC 管道进行数据复制。