具有 json 架构的 Kafka jdbc 接收器连接器无法正常工作
Kafka jdbc sink connector with json schema not working
使用最新的 kafka 和 confluent jdbc 接收器连接器。发送一个非常简单的 Json 消息:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "msg"
}
],
"optional": false,
"name": "msgschema"
},
"payload": {
"id": 222,
"msg": "hi"
}
}
但是出现错误:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Jsonlint 表示 Json 有效。我在 kafka 配置中保留了 json schemas.enable=true
。有什么指点吗?
为了使用 JDBC 接收器,您的流式传输消息必须具有架构。这可以通过将 Avro 与模式注册表一起使用,或通过将 JSON 与模式一起使用来实现。如果在最初 运行 源属性文件之后配置了 schemas.enable=true
,您可能需要删除主题,re-run 接收器,然后再次启动源端。
示例:
sink.properties
文件
name=sink-mysql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test-mysql-jdbc-foobar
connection.url=jdbc:mysql://127.0.0.1:3306/demo?user=user1&password=user1pass
auto.create=true
和一个示例工作器配置文件 connect-avro-standalone.properties
:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java
并执行
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink.properties
您需要告诉 Connect 您的架构 是 嵌入到您正在使用的 JSON 中。
你有:
value.converter=org.apache.kafka.connect.json.JsonConverter
但还需要:
value.converter.schemas.enable=true
使用最新的 kafka 和 confluent jdbc 接收器连接器。发送一个非常简单的 Json 消息:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "msg"
}
],
"optional": false,
"name": "msgschema"
},
"payload": {
"id": 222,
"msg": "hi"
}
}
但是出现错误:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Jsonlint 表示 Json 有效。我在 kafka 配置中保留了 json schemas.enable=true
。有什么指点吗?
为了使用 JDBC 接收器,您的流式传输消息必须具有架构。这可以通过将 Avro 与模式注册表一起使用,或通过将 JSON 与模式一起使用来实现。如果在最初 运行 源属性文件之后配置了 schemas.enable=true
,您可能需要删除主题,re-run 接收器,然后再次启动源端。
示例:
sink.properties
文件
name=sink-mysql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test-mysql-jdbc-foobar
connection.url=jdbc:mysql://127.0.0.1:3306/demo?user=user1&password=user1pass
auto.create=true
和一个示例工作器配置文件 connect-avro-standalone.properties
:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java
并执行
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink.properties
您需要告诉 Connect 您的架构 是 嵌入到您正在使用的 JSON 中。
你有:
value.converter=org.apache.kafka.connect.json.JsonConverter
但还需要:
value.converter.schemas.enable=true