为融合的 kafka-connect-s3 实现自定义 AvroConverter

Implementing custom AvroConverter for confluent kafka-connect-s3

我正在使用 Confluent's Kafka s3 connect 将数据从 apache Kafka 复制到 AWS S3。

问题是我有 AVRO 格式的 Kafka 数据,它没有使用 Confluent Schema Registry 的 Avro 序列化程序,我无法更改 Kafka 生产者。所以我需要反序列化来自 Kafka 的现有 Avro 数据,然后在 AWS S3 中以镶木地板格式保存相同的数据。我试过像这样使用 confluent 的 AvroConverter 作为值转换器 -

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost/api/v1/avro

我收到这个错误 -

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dcp-all to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

据我所知,"io.confluent.connect.avro.AvroConverter" 只有在使用 Confluent Schema Registry 的 Avro 序列化程序在 Kafka 中写入数据时才有效,因此我收到此错误。所以我的问题是在这种情况下我需要实现一个通用的 AvroConverter 吗?如果是,我该如何扩展现有的源代码 - https://github.com/confluentinc/kafka-connect-storage-cloud?

如有任何帮助,我们将不胜感激。

您不需要扩展该存储库。你只需要 implement a Converter (part of Apache Kafka) shade it into a JAR, then place it on your Connect worker's CLASSPATH, like BlueApron did for Protobuf

或者看看这是否有效 - https://github.com/farmdawgnation/registryless-avro-converter


NOT using Confluent Schema Registry

那么您使用的是什么注册表?我所知道的每一个都有与 Confluent 接口的配置