使用 Kafka Connect 读取 AVRO 编码消息(由 KSQL 流创建)时出现问题

Issues reading AVRO encoded messages (created by KSQL stream) with Kafka Connect

当我们通过 KSQL 创建 AVRO 消息并尝试使用 Kafka Connect 使用它们时,发生了一些奇怪的事情。一些上下文:

源数据 第 3 方提供商正在我们的一个 Kafka 集群上生成数据 JSON(到目前为止,还不错)。我们实际上看到了进来的数据。

数据转换 由于我们的内部系统需要在 AVRO 中对数据进行编码,因此我们创建了一个 KSQL 集群,通过在 KSQL 中创建以下流将传入数据转换为 AVRO:

{
    "ksql": "
        CREATE STREAM src_stream (browser_name VARCHAR)
        WITH (KAFKA_TOPIC='json_topic', VALUE_FORMAT='JSON');

        CREATE STREAM sink_stream WITH (KAFKA_TOPIC='avro_topic',VALUE_FORMAT='AVRO',  PARTITIONS=1, REPLICAS=3) AS
        SELECT * FROM src_stream;
    ",
    "streamsProperties": {
        "ksql.streams.auto.offset.reset": "earliest"
    }
}

(到目前为止,还不错)

我们看到随着偏移量的增加,数据从 JSON 主题生成到 AVRO 主题。

然后我们在(新的)Kafka Connect 集群中创建一个 Kafka 连接器。作为某些上下文,我们正在使用多个 Kafka Connect 集群(这些集群具有相同的属性),因此我们有一个 Kafka Connect 集群 运行 用于此数据,但集群的精确副本用于其他 AVRO 数据(1 个用于分析,1 个用于我们的业务数据)。

此连接器的接收器是 BigQuery,我们使用的是 Wepay BigQuery Sink Connector 1.2.0。再次,到目前为止,一切都很好。我们的业务集群 运行 使用此连接器很好,业务集群上的 AVRO 主题正在流入 BigQuery。

然而,当我们尝试使用之前由 KSQL 语句创建的 AVRO 主题时,我们看到抛出异常:/

例外情况如下:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: dpt_video_event-created_v2
 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord[=11=](WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:415)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:408)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:123)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:190)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:169)
 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:243)
 at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:134)
 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:85)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord[=11=](WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

这对我们来说表明 Kafka Connect 正在读取消息、解码 AVRO 并尝试从模式注册表中获取 ID 为 0 的模式。显然,模式注册表中的模式 ID 始终 > 0。

我们目前正试图找出这里的问题。看起来 KSQL 正在使用模式 ID 0 对消息进行编码,但我们无法找到原因:/

感谢任何帮助!

BR, 帕特里克

更新: 我们已经为 AVRO 消息实现了一个基本的消费者,并且该消费者正确识别了 AVRO 消息中的模式(ID:3),因此它似乎被重新命名为 Kafka Connect,而不是实际的 KSQL / AVRO 消息。

Obviously, schema IDs in the schema registry are always > 0... It looks like KSQL is encoding the message with schema ID 0, but we're unable to find the cause for that

AvroConverter 做了一个 "dumb check",它只看起来消耗的字节以 0x0 的魔术字节开头。接下来的 4 个字节是 ID。

如果您使用的是 key.converter=AvroConverter 并且您的密钥以十六进制格式 0x00000 开头,则 ID 在日志中将显示为 0,并且查找将失败。

我最后检查过,KSQL 不会以 Avro 格式输出密钥,因此您需要检查连接器的属性。