ES Sink 连接器 debezium 因错误而停止
ES Sink connector debezium stops with error
要了解 cdc 的工作原理,
我一直在使用 debezium 站点 https://debezium.io/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/.
上给出的以下示例
在此,如果我尝试将接收器连接器从 mongo db 更改为弹性搜索,然后启动 es-sink 连接器。它显示以下错误
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Mysql Debezium source connector属性是这样的(请忽略更正url)
{
"name": "mysql-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "cdc",
"database.password": "passwrod",
"database.server.id": "1840514",
"database.server.name": "dbserver1",
"table.whitelist": "inventory.customers,inventory.addresses",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "unwrap",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false"
}
}
elastic search sink连接器就是这个例子中使用的
https://debezium.io/blog/2018/01/17/streaming-to-elasticsearch/
Elastic sink connector属性是这样的(请忽略更正url)
{
"name": "elastic-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "product-cdc,final_ddd_aggregates,dbserver1.inventory.customers,dbserver1.inventory.addresses",
"connection.url": "https://localhost:9243",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"schema.ignore" : "true",
"value.converter.schemas.enable":"true",
"type.name":"final_ddd_aggregates"
}
}
请协助我。
因为错误消息暗示您可能有 JSON 没有架构的消息存储在您正在阅读的主题中。您需要在源端启用它或在接收器端禁用它。
请查看此 FAQ entry 了解更多详情。
在你的配置中,你需要按照错误消息告诉你的那样做,并设置schemas.enable=false
。使用文章中的示例,而不是:
{
"name": "mongodb-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"tasks.max": "1",
"topics": "final_ddd_aggregates",
"mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
"mongodb.collection": "customers_with_addresses",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
"mongodb.delete.on.null.values": true
}
}
你会:
{
"name": "mongodb-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"tasks.max": "1",
"topics": "final_ddd_aggregates",
"mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
"mongodb.collection": "customers_with_addresses",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
"mongodb.delete.on.null.values": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
}
}
要了解有关转换器等的更多信息,请参阅 https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained and also http://rmoff.dev/ksldn19-kafka-connect
要了解 cdc 的工作原理, 我一直在使用 debezium 站点 https://debezium.io/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/.
上给出的以下示例在此,如果我尝试将接收器连接器从 mongo db 更改为弹性搜索,然后启动 es-sink 连接器。它显示以下错误
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Mysql Debezium source connector属性是这样的(请忽略更正url)
{
"name": "mysql-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "cdc",
"database.password": "passwrod",
"database.server.id": "1840514",
"database.server.name": "dbserver1",
"table.whitelist": "inventory.customers,inventory.addresses",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "unwrap",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false"
}
}
elastic search sink连接器就是这个例子中使用的 https://debezium.io/blog/2018/01/17/streaming-to-elasticsearch/
Elastic sink connector属性是这样的(请忽略更正url)
{
"name": "elastic-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "product-cdc,final_ddd_aggregates,dbserver1.inventory.customers,dbserver1.inventory.addresses",
"connection.url": "https://localhost:9243",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"schema.ignore" : "true",
"value.converter.schemas.enable":"true",
"type.name":"final_ddd_aggregates"
}
}
请协助我。
因为错误消息暗示您可能有 JSON 没有架构的消息存储在您正在阅读的主题中。您需要在源端启用它或在接收器端禁用它。
请查看此 FAQ entry 了解更多详情。
在你的配置中,你需要按照错误消息告诉你的那样做,并设置schemas.enable=false
。使用文章中的示例,而不是:
{
"name": "mongodb-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"tasks.max": "1",
"topics": "final_ddd_aggregates",
"mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
"mongodb.collection": "customers_with_addresses",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
"mongodb.delete.on.null.values": true
}
}
你会:
{
"name": "mongodb-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"tasks.max": "1",
"topics": "final_ddd_aggregates",
"mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
"mongodb.collection": "customers_with_addresses",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
"mongodb.delete.on.null.values": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
}
}
要了解有关转换器等的更多信息,请参阅 https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained and also http://rmoff.dev/ksldn19-kafka-connect