ElasticsearchSinkConnector 无法将数据反序列化为 Avro
ElasticsearchSinkConnector Failed to deserialize data to Avro
我创建了最简单的 kafka sink 连接器配置并且我使用的是 confluent 4.1.0:
{
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "test-type",
"tasks.max": "1",
"topics": "dialogs",
"name": "elasticsearch-sink",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"schema.ignore": "true"
}
并且在主题中我将消息保存在 JSON
{ "topics": "resd"}
但结果我得到一个错误:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
发生该错误是因为它试图读取非 Confluent Schema Registry 编码的 Avro 消息。
如果主题数据是Avro,需要使用Schema Registry。
否则,如果主题数据是 JSON,那么您已经在 属性 文件中的键或值上使用 AvroConverter 启动了连接集群,您需要在其中使用 JsonConverter
正如 cricket_007 所说,您需要告诉 Connect 使用 Json 反序列化器,如果这是您的数据所采用的格式。将其添加到您的连接器配置中:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
我创建了最简单的 kafka sink 连接器配置并且我使用的是 confluent 4.1.0:
{
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "test-type",
"tasks.max": "1",
"topics": "dialogs",
"name": "elasticsearch-sink",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"schema.ignore": "true"
}
并且在主题中我将消息保存在 JSON
{ "topics": "resd"}
但结果我得到一个错误:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
发生该错误是因为它试图读取非 Confluent Schema Registry 编码的 Avro 消息。
如果主题数据是Avro,需要使用Schema Registry。
否则,如果主题数据是 JSON,那么您已经在 属性 文件中的键或值上使用 AvroConverter 启动了连接集群,您需要在其中使用 JsonConverter
正如 cricket_007 所说,您需要告诉 Connect 使用 Json 反序列化器,如果这是您的数据所采用的格式。将其添加到您的连接器配置中:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"