org.apache.kafka.connect.errors.DataException: 由于主题的序列化错误,将 byte[] 转换为 Kafka Connect 数据失败

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic

尝试在 ksql CLI 上创建 ElasticSearch 接收器连接器时,出现以下错误:

ERROR WorkerSinkTask{id=SINK_ELASTIC_TEST_JSON_A-0} Error converting message value in topic 'REROUTES_TABLE' partition 0 at offset 939 and timestamp 1641056495920: Converting byte[] to Kafka Connect data failed due to serialization error of topic REROUTES_TABLE: (org.apache.kafka.connect.runtime.WorkerSinkTask)

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id 30 Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)

它的创建命令如下所示:

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
  'connector.class'         = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'          = 'http://elasticsearch:9200',
  'key.converter'           = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'         = 'io.confluent.connect.json.JsonSchemaConverter',
  'value.converter.schema.registry.url' = 'http://localhost:8081',
  'value.converter.schemas.enable' = 'true',
  'type.name'               = '_doc',
  'topics'                  = 'REROUTES_TABLE',
  'key.ignore'              = 'false',
  'schema.ignore'           = 'false'
  );

数据如下所示:

ksql> print REROUTES_TABLE from beginning limit 1;

Key format: KAFKA_INT or KAFKA_STRING Value format: JSON_SR or KAFKA_STRING rowtime: 2021/12/26 06:22:33.726 Z, key: 0, value: {"STEP_CNT":1,"TOT_LEN":0.0013573977968634994}, partition: 0 Topic printing ceased

主题值的架构是:

{"subject":"REROUTES_TABLE-value","version":1,"id":30,"schemaType":"JSON","schema":"{"type":"object","properties":{"STEP_CNT":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"integer","connect.type":"int64"}]},"TOT_LEN":{"connect.index":1,"oneOf":[{"type":"null"},{"type":"number","connect.type":"float64"}]}}}"}

REROUTES_TABLE 建立在流上,对流数据进行了一些聚合。

我有点怀疑有一个反序列化器无法理解的空值,但由于 REROUTES_TABLE 能够对流执行聚合,空值是如何以及从哪里来的,更重要的是如何这可以解决吗(即使我关于 null 的假设不正确)?

问题出在我的连接器设置上:

'value.converter.schema.registry.url' = 'http://localhost:8081',

虽然它应该是

'value.converter.schema.registry.url' = 'http://schema-registry:8081',