org.apache.kafka.connect.errors.DataException: 由于主题的序列化错误,将 byte[] 转换为 Kafka Connect 数据失败
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic
尝试在 ksql CLI 上创建 ElasticSearch 接收器连接器时,出现以下错误:
ERROR WorkerSinkTask{id=SINK_ELASTIC_TEST_JSON_A-0} Error converting
message value in topic 'REROUTES_TABLE' partition 0 at offset 939 and
timestamp 1641056495920: Converting byte[] to Kafka Connect data
failed due to serialization error of topic REROUTES_TABLE:
(org.apache.kafka.connect.runtime.WorkerSinkTask)
Caused by: org.apache.kafka.common.errors.SerializationException:
Error deserializing JSON message for id 30 Caused by:
java.net.ConnectException: Connection refused (Connection refused) at
java.base/java.net.PlainSocketImpl.socketConnect(Native Method) at
java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
它的创建命令如下所示:
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.json.JsonSchemaConverter',
'value.converter.schema.registry.url' = 'http://localhost:8081',
'value.converter.schemas.enable' = 'true',
'type.name' = '_doc',
'topics' = 'REROUTES_TABLE',
'key.ignore' = 'false',
'schema.ignore' = 'false'
);
数据如下所示:
ksql> print REROUTES_TABLE from beginning limit 1;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON_SR or KAFKA_STRING
rowtime: 2021/12/26 06:22:33.726 Z, key: 0, value: {"STEP_CNT":1,"TOT_LEN":0.0013573977968634994}, partition: 0
Topic printing ceased
主题值的架构是:
{"subject":"REROUTES_TABLE-value","version":1,"id":30,"schemaType":"JSON","schema":"{"type":"object","properties":{"STEP_CNT":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int64"}]},"TOT_LEN":{"connect.index":1,"oneOf":[{"type":"null"},{"type":"number","connect.type":"float64"}]}}}"}
REROUTES_TABLE 建立在流上,对流数据进行了一些聚合。
我有点怀疑有一个反序列化器无法理解的空值,但由于 REROUTES_TABLE 能够对流执行聚合,空值是如何以及从哪里来的,更重要的是如何这可以解决吗(即使我关于 null 的假设不正确)?
问题出在我的连接器设置上:
'value.converter.schema.registry.url' = 'http://localhost:8081',
虽然它应该是
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
尝试在 ksql CLI 上创建 ElasticSearch 接收器连接器时,出现以下错误:
ERROR WorkerSinkTask{id=SINK_ELASTIC_TEST_JSON_A-0} Error converting message value in topic 'REROUTES_TABLE' partition 0 at offset 939 and timestamp 1641056495920: Converting byte[] to Kafka Connect data failed due to serialization error of topic REROUTES_TABLE: (org.apache.kafka.connect.runtime.WorkerSinkTask)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id 30 Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
它的创建命令如下所示:
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.json.JsonSchemaConverter',
'value.converter.schema.registry.url' = 'http://localhost:8081',
'value.converter.schemas.enable' = 'true',
'type.name' = '_doc',
'topics' = 'REROUTES_TABLE',
'key.ignore' = 'false',
'schema.ignore' = 'false'
);
数据如下所示:
ksql> print REROUTES_TABLE from beginning limit 1;
Key format: KAFKA_INT or KAFKA_STRING Value format: JSON_SR or KAFKA_STRING rowtime: 2021/12/26 06:22:33.726 Z, key: 0, value: {"STEP_CNT":1,"TOT_LEN":0.0013573977968634994}, partition: 0 Topic printing ceased
主题值的架构是:
{"subject":"REROUTES_TABLE-value","version":1,"id":30,"schemaType":"JSON","schema":"{"type":"object","properties":{"STEP_CNT":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int64"}]},"TOT_LEN":{"connect.index":1,"oneOf":[{"type":"null"},{"type":"number","connect.type":"float64"}]}}}"}
REROUTES_TABLE 建立在流上,对流数据进行了一些聚合。
我有点怀疑有一个反序列化器无法理解的空值,但由于 REROUTES_TABLE 能够对流执行聚合,空值是如何以及从哪里来的,更重要的是如何这可以解决吗(即使我关于 null 的假设不正确)?
问题出在我的连接器设置上:
'value.converter.schema.registry.url' = 'http://localhost:8081',
虽然它应该是
'value.converter.schema.registry.url' = 'http://schema-registry:8081',