将 s3 连接器与用于 kafka 的 landoop docker 容器一起使用时出错

Error using s3 connector with landoop docker container for kafka

使用以下配置创建接收器连接器时

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-west-2
topics.dir=topics
flush.size=3
schema.compatibility=NONE
topics=my_topic
tasks.max=1
s3.part.size=5242880
format.class=io.confluent.connect.s3.format.avro.AvroFormat
# added after comment 
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=my-bucket

和运行它我得到以下错误

org.apache.kafka.connect.errors.DataException: coyote-test-avro
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 91319
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:192)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

但是我的主题有一个模式,正如我使用 docker 容器 landoop/fast-data-dev 提供的 UI 看到的那样。即使我尝试将原始数据写入 s3 更改以下配置

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE

并删除 schema.generator.class,出现同样的错误,尽管根据我的理解这不应该使用 avro 模式。

为了能够写入 s3,我在我的容器中设置了环境变量 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY,但无论如何问题似乎在那之前就出现了。

我想版本可能有问题,如上所述我在docker-machine中使用容器landoop/fast-data-dev(它在mac native docker machine) 和 produce 和 consumer 工作完美。 这是关于部分

我查看了连接日志,但找不到任何有用的信息,但是如果你能告诉我应该查找什么,我会添加相关行(所有日志都太大)

每个主题消息都必须编码为 Avro,如架构注册表所指定的那样。

转换器查看原始 Kafka 数据(键和值)的字节 2-5,转换为整数(在您的情况下,错误中的 ID),并查找注册表。

如果不是 Avro 或其他错误数据,您会收到此处的错误或关于 invalid magic byte 的错误。

并且此错误不是连接错误。如果添加 print-key 属性,您可以使用 Avro 控制台消费者重现它。

假设是这种情况,一种解决方案是更改密钥 serde 以使用字节数组反序列化器,这样它就可以跳过 avro 查找

否则,由于无法删除 Kafka 中的消息,这里唯一的解决方案是找出生产者发送错误数据的原因,修复它们,然后将连接消费者组移动到具有有效数据的最新偏移量,等待使无效数据在该主题上过期,或完全移至新主题