将 Glue 模式注册表与 MSK 连接器结合使用

Using Glue schema registry with MSK Connector

我一直在尝试创建一个 MSK 连接器并使用 Glue 模式注册表。 配置如下

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=eu-west-1
topics.dir=topics/dir
flush.size=200
tasks.max=2
s3.part.size=5242880
timezone=GMT
# value.converter.schema.registry.url=http://someIP:8081
key.converter.schemaName=my-topic-schema
locale=US
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
value.converter.schemaName=my-topic-schema
value.converter=io.confluent.connect.avro.StringConverter
s3.bucket.name=my-bucket
key.converter=io.confluent.connect.avro.StringConverter
# key.converter.schema.registry.url==http://someIP:8081
partition.duration.ms=3600000
schema.compatibility=BACKWARD
topics=osb
value.converter.registry.name=Glue-Schema-Registry
key.converter.registry.name=Glue-Schema-Registry
key.converter.schemas.enable=true
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=true
storage.class=io.confluent.connect.s3.storage.S3Storage
rotate.schedule.interval.ms=0
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
timestamp.extractor=RecordField
timestamp.field=timestamp

首先,我在 EC2 上使用融合架构注册表 运行,我在“key/value.converter.schema.registry.url”字段中添加了其 ip,它工作正常。现在我正在尝试使用 Glue Schema Registry。但是我不知道如何将连接器连接到 Glue Schema Registry。

这些类不存在

key.converter=io.confluent.connect.avro.StringConverter
value.converter=io.confluent.connect.avro.StringConverter

StringConverter 类名以 org.apache.kafka

开头

同样,看起来您添加了一堆随机转换器属性,除了 url

之外,这些属性对 String 或 Confluent Avro 转换器无效

要使用 Glue,您需要使用 AWSKafkaAvroConverter,它是此存储库的一部分,

https://github.com/awslabs/aws-glue-schema-registry/tree/master/avro-kafkaconnect-converter

并在此处记录

https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-apache-kafka-connect