如何在 Kafka 源连接器(例如 Debezium)中禁用 JSON 模式
How to disable JSON schema in Kafka Source Connector (e.g. Debezium)
我遵循了 Debezium 教程 (https://github.com/debezium/debezium-examples/tree/master/tutorial#using-postgres),所有从 Postgres 接收到的 CDC 数据都以 JSON 格式和模式发送到 Kafka 主题 - 如何摆脱模式?
这是连接器的配置(在 Docker 容器中启动)
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory"
}
}
JSON 架构仍在消息中。
只有在使用以下环境变量启动 Docker 容器时,我才设法摆脱它:
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
为什么我无法通过连接器配置实现完全相同的效果?
具有架构的 Kafka 消息示例:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611918971029,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":602,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1611918971032,"transaction":null}}
示例(我想要)w/o 架构:
{"id":1001} {"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611920304594,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":597,"lsn":33809448,"xmin":null},"op":"r","ts_ms":1611920304596,"transaction":null}
Debezium 容器是 运行,使用以下命令:
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses -e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.3
或 docker-compose
connect:
image: debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
和 CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
是我后来添加的,但没有它们我无法摆脱模式。
connect
docker 容器(Kafka 连接器服务器集群 - 如果我理解正确的话)在没有任何连接器的情况下启动。
我手动创建它。
来自 docker 的日志 - 在创建连接器时为连接编写
connect_1 | 2021-01-29 18:04:57,395 INFO || JsonConverterConfig values:
connect_1 | converter.type = key
connect_1 | decimal.format = BASE64
connect_1 | schemas.cache.size = 1000
connect_1 | schemas.enable = true
connect_1 | [org.apache.kafka.connect.json.JsonConverterConfig]
connect_1 | 2021-01-29 18:04:57,396 INFO || Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task inventory-connector-0 using the worker config [org.apache.kafka.connect.runtime.Worker]
connect_1 | 2021-01-29 18:04:57,396 INFO || JsonConverterConfig values:
connect_1 | converter.type = value
connect_1 | decimal.format = BASE64
connect_1 | schemas.cache.size = 1000
connect_1 | schemas.enable = true
connect_1 | [org.apache.kafka.connect.json.JsonConverterConfig]
...
connect_1 | 2021-01-29 18:04:57,458 INFO || Starting PostgresConnectorTask with configuration: [io.debezium.connector.common.BaseSourceTask]
connect_1 | 2021-01-29 18:04:57,460 INFO || key.converter.schemas.enable = false [io.debezium.connector.common.BaseSourceTask]
connect_1 | 2021-01-29 18:04:57,460 INFO || value.converter.schemas.enable = false [io.debezium.connector.common.BaseSourceTask]
这里是获取连接器命令的输出:
$ curl -i http://localhost:8083/connectors/inventory-connector
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector",**"key.converter.schemas.enable":"false"**,"database.user":"postgres","database.dbname":"postgres","tasks.max":"1","database.hostname":"postgres","database.password":"postgres",**"value.converter.schemas.enable":"false"**,"name":"inventory-connector","database.server.name":"dbserver1","database.port":"5432","schema.include":"inventory"},"tasks":[{"connector":"inventory-connector","task":0}],"type":"source"}
我转载了this example. As @OneCricketeer mentioned in the comment, you have to explicitly add JsonConverter
:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory"
}
}
我遵循了 Debezium 教程 (https://github.com/debezium/debezium-examples/tree/master/tutorial#using-postgres),所有从 Postgres 接收到的 CDC 数据都以 JSON 格式和模式发送到 Kafka 主题 - 如何摆脱模式?
这是连接器的配置(在 Docker 容器中启动)
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory"
}
}
JSON 架构仍在消息中。 只有在使用以下环境变量启动 Docker 容器时,我才设法摆脱它:
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
为什么我无法通过连接器配置实现完全相同的效果?
具有架构的 Kafka 消息示例:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611918971029,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":602,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1611918971032,"transaction":null}}
示例(我想要)w/o 架构:
{"id":1001} {"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611920304594,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":597,"lsn":33809448,"xmin":null},"op":"r","ts_ms":1611920304596,"transaction":null}
Debezium 容器是 运行,使用以下命令:
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses -e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.3
或 docker-compose
connect:
image: debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
和 CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
是我后来添加的,但没有它们我无法摆脱模式。
connect
docker 容器(Kafka 连接器服务器集群 - 如果我理解正确的话)在没有任何连接器的情况下启动。
我手动创建它。
来自 docker 的日志 - 在创建连接器时为连接编写
connect_1 | 2021-01-29 18:04:57,395 INFO || JsonConverterConfig values:
connect_1 | converter.type = key
connect_1 | decimal.format = BASE64
connect_1 | schemas.cache.size = 1000
connect_1 | schemas.enable = true
connect_1 | [org.apache.kafka.connect.json.JsonConverterConfig]
connect_1 | 2021-01-29 18:04:57,396 INFO || Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task inventory-connector-0 using the worker config [org.apache.kafka.connect.runtime.Worker]
connect_1 | 2021-01-29 18:04:57,396 INFO || JsonConverterConfig values:
connect_1 | converter.type = value
connect_1 | decimal.format = BASE64
connect_1 | schemas.cache.size = 1000
connect_1 | schemas.enable = true
connect_1 | [org.apache.kafka.connect.json.JsonConverterConfig]
...
connect_1 | 2021-01-29 18:04:57,458 INFO || Starting PostgresConnectorTask with configuration: [io.debezium.connector.common.BaseSourceTask]
connect_1 | 2021-01-29 18:04:57,460 INFO || key.converter.schemas.enable = false [io.debezium.connector.common.BaseSourceTask]
connect_1 | 2021-01-29 18:04:57,460 INFO || value.converter.schemas.enable = false [io.debezium.connector.common.BaseSourceTask]
这里是获取连接器命令的输出:
$ curl -i http://localhost:8083/connectors/inventory-connector
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector",**"key.converter.schemas.enable":"false"**,"database.user":"postgres","database.dbname":"postgres","tasks.max":"1","database.hostname":"postgres","database.password":"postgres",**"value.converter.schemas.enable":"false"**,"name":"inventory-connector","database.server.name":"dbserver1","database.port":"5432","schema.include":"inventory"},"tasks":[{"connector":"inventory-connector","task":0}],"type":"source"}
我转载了this example. As @OneCricketeer mentioned in the comment, you have to explicitly add JsonConverter
:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory"
}
}