将 s3 连接器与用于 kafka 的 landoop docker 容器一起使用时出错
Error using s3 connector with landoop docker container for kafka
使用以下配置创建接收器连接器时
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-west-2
topics.dir=topics
flush.size=3
schema.compatibility=NONE
topics=my_topic
tasks.max=1
s3.part.size=5242880
format.class=io.confluent.connect.s3.format.avro.AvroFormat
# added after comment
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=my-bucket
和运行它我得到以下错误
org.apache.kafka.connect.errors.DataException: coyote-test-avro
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 91319
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:192)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
但是我的主题有一个模式,正如我使用 docker 容器 landoop/fast-data-dev
提供的 UI 看到的那样。即使我尝试将原始数据写入 s3 更改以下配置
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE
并删除 schema.generator.class
,出现同样的错误,尽管根据我的理解这不应该使用 avro 模式。
为了能够写入 s3,我在我的容器中设置了环境变量 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
,但无论如何问题似乎在那之前就出现了。
我想版本可能有问题,如上所述我在docker-machine中使用容器landoop/fast-data-dev
(它在mac native docker machine) 和 produce 和 consumer 工作完美。
这是关于部分
我查看了连接日志,但找不到任何有用的信息,但是如果你能告诉我应该查找什么,我会添加相关行(所有日志都太大)
每个主题消息都必须编码为 Avro,如架构注册表所指定的那样。
转换器查看原始 Kafka 数据(键和值)的字节 2-5,转换为整数(在您的情况下,错误中的 ID),并查找注册表。
如果不是 Avro 或其他错误数据,您会收到此处的错误或关于 invalid magic byte
的错误。
并且此错误不是连接错误。如果添加 print-key
属性,您可以使用 Avro 控制台消费者重现它。
假设是这种情况,一种解决方案是更改密钥 serde 以使用字节数组反序列化器,这样它就可以跳过 avro 查找
否则,由于无法删除 Kafka 中的消息,这里唯一的解决方案是找出生产者发送错误数据的原因,修复它们,然后将连接消费者组移动到具有有效数据的最新偏移量,等待使无效数据在该主题上过期,或完全移至新主题
使用以下配置创建接收器连接器时
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-west-2
topics.dir=topics
flush.size=3
schema.compatibility=NONE
topics=my_topic
tasks.max=1
s3.part.size=5242880
format.class=io.confluent.connect.s3.format.avro.AvroFormat
# added after comment
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=my-bucket
和运行它我得到以下错误
org.apache.kafka.connect.errors.DataException: coyote-test-avro
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 91319
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:192)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
但是我的主题有一个模式,正如我使用 docker 容器 landoop/fast-data-dev
提供的 UI 看到的那样。即使我尝试将原始数据写入 s3 更改以下配置
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE
并删除 schema.generator.class
,出现同样的错误,尽管根据我的理解这不应该使用 avro 模式。
为了能够写入 s3,我在我的容器中设置了环境变量 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
,但无论如何问题似乎在那之前就出现了。
我想版本可能有问题,如上所述我在docker-machine中使用容器landoop/fast-data-dev
(它在mac native docker machine) 和 produce 和 consumer 工作完美。
这是关于部分
我查看了连接日志,但找不到任何有用的信息,但是如果你能告诉我应该查找什么,我会添加相关行(所有日志都太大)
每个主题消息都必须编码为 Avro,如架构注册表所指定的那样。
转换器查看原始 Kafka 数据(键和值)的字节 2-5,转换为整数(在您的情况下,错误中的 ID),并查找注册表。
如果不是 Avro 或其他错误数据,您会收到此处的错误或关于 invalid magic byte
的错误。
并且此错误不是连接错误。如果添加 print-key
属性,您可以使用 Avro 控制台消费者重现它。
假设是这种情况,一种解决方案是更改密钥 serde 以使用字节数组反序列化器,这样它就可以跳过 avro 查找
否则,由于无法删除 Kafka 中的消息,这里唯一的解决方案是找出生产者发送错误数据的原因,修复它们,然后将连接消费者组移动到具有有效数据的最新偏移量,等待使无效数据在该主题上过期,或完全移至新主题