发送到 kafka connect 时,Kafka Avro Producer (kafka-avro-console-producer) 模式错误
Kafka Avro Producer (kafka-avro-console-producer) schema error when sending to kafka connect
我正在尝试使用 kafka-avro-console-producer 发布一条消息,该消息具有一个键(带有模式)和一个值(带有模式)。 kafka 环境(kafka、connect、zookeeper、schema registry 的 confluent 6.2.0 版本)都正常启动,我可以确认我的连接器已安装。问题是,当我发送消息时,我的 Sink 连接器失败并出现我无法诊断的错误。
感谢任何帮助:
我生成一条 AVRO 消息如下:
docker exec -it schema-registry \
/usr/bin/kafka-avro-console-producer \
--broker-list http://kafka:9092 \
--topic source-1 \
--property value.schema='{"type":"record","name":"somerecord","fields":[{"name":"timestamp","type":"string"}, {"name":"data","type":"string"}]}' \
--property parse.key=true \
--property key.schema='{"type":"int"}' \
--property key.separator=" "
1 {"timestamp":"some-timestamp", "data":"somedata"}
并在连接日志中收到以下错误:
connect | [2021-10-04 18:22:51,792] ERROR WorkerSinkTask{id=brodagroupsoftware-http-sink-connector-0} Error converting message key in topic 'source-1' partition 0 at offset 0 and timestamp 1633371770674: Converting byte[] to Kafka Connect data failed due to serialization error of topic source-1: (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect | org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic source-1:
connect | at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:118)
connect | at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:493)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id 1
connect | Caused by: java.io.IOException: Invalid schema "long" with refs [] of type AVRO
connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.lambda$getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:229)
connect | at java.base/java.util.Optional.orElseThrow(Optional.java:408)
connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:227)
connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:298)
connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:283)
connect | at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:107)
connect | at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:208)
connect | at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:163)
connect | at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:107)
connect | at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:493)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | [2021-10-04 18:22:51,796] INFO HttpSinkTask:flush (com.brodagroup.datamesh.connect.httpsinkconnector.HttpSinkTask)
我的docker-compose开始“连接”如下图所示:
connect:
image: confluentinc/cp-kafka-connect:6.2.0
hostname: connect
container_name: connect
depends_on:
- zookeeper
- kafka
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_KEY_CONVERTER: "io.confluent.connect.json.JsonSchemaConverter"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.json.JsonSchemaConverter"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java'
根据堆栈跟踪,您正在使用 JsonSchemaConverter
,但是,您已经生成了 Avro 数据。
因此,您应该在连接器配置中使用 AvroConverter
我正在尝试使用 kafka-avro-console-producer 发布一条消息,该消息具有一个键(带有模式)和一个值(带有模式)。 kafka 环境(kafka、connect、zookeeper、schema registry 的 confluent 6.2.0 版本)都正常启动,我可以确认我的连接器已安装。问题是,当我发送消息时,我的 Sink 连接器失败并出现我无法诊断的错误。
感谢任何帮助:
我生成一条 AVRO 消息如下:
docker exec -it schema-registry \
/usr/bin/kafka-avro-console-producer \
--broker-list http://kafka:9092 \
--topic source-1 \
--property value.schema='{"type":"record","name":"somerecord","fields":[{"name":"timestamp","type":"string"}, {"name":"data","type":"string"}]}' \
--property parse.key=true \
--property key.schema='{"type":"int"}' \
--property key.separator=" "
1 {"timestamp":"some-timestamp", "data":"somedata"}
并在连接日志中收到以下错误:
connect | [2021-10-04 18:22:51,792] ERROR WorkerSinkTask{id=brodagroupsoftware-http-sink-connector-0} Error converting message key in topic 'source-1' partition 0 at offset 0 and timestamp 1633371770674: Converting byte[] to Kafka Connect data failed due to serialization error of topic source-1: (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect | org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic source-1:
connect | at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:118)
connect | at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:493)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id 1
connect | Caused by: java.io.IOException: Invalid schema "long" with refs [] of type AVRO
connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.lambda$getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:229)
connect | at java.base/java.util.Optional.orElseThrow(Optional.java:408)
connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:227)
connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:298)
connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:283)
connect | at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:107)
connect | at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:208)
connect | at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:163)
connect | at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:107)
connect | at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:493)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | [2021-10-04 18:22:51,796] INFO HttpSinkTask:flush (com.brodagroup.datamesh.connect.httpsinkconnector.HttpSinkTask)
我的docker-compose开始“连接”如下图所示:
connect:
image: confluentinc/cp-kafka-connect:6.2.0
hostname: connect
container_name: connect
depends_on:
- zookeeper
- kafka
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_KEY_CONVERTER: "io.confluent.connect.json.JsonSchemaConverter"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.json.JsonSchemaConverter"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java'
根据堆栈跟踪,您正在使用 JsonSchemaConverter
,但是,您已经生成了 Avro 数据。
因此,您应该在连接器配置中使用 AvroConverter