将 Glue 模式注册表与 MSK 连接器结合使用
Using Glue schema registry with MSK Connector
我一直在尝试创建一个 MSK 连接器并使用 Glue 模式注册表。
配置如下
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=eu-west-1
topics.dir=topics/dir
flush.size=200
tasks.max=2
s3.part.size=5242880
timezone=GMT
# value.converter.schema.registry.url=http://someIP:8081
key.converter.schemaName=my-topic-schema
locale=US
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
value.converter.schemaName=my-topic-schema
value.converter=io.confluent.connect.avro.StringConverter
s3.bucket.name=my-bucket
key.converter=io.confluent.connect.avro.StringConverter
# key.converter.schema.registry.url==http://someIP:8081
partition.duration.ms=3600000
schema.compatibility=BACKWARD
topics=osb
value.converter.registry.name=Glue-Schema-Registry
key.converter.registry.name=Glue-Schema-Registry
key.converter.schemas.enable=true
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=true
storage.class=io.confluent.connect.s3.storage.S3Storage
rotate.schedule.interval.ms=0
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
timestamp.extractor=RecordField
timestamp.field=timestamp
首先,我在 EC2 上使用融合架构注册表 运行,我在“key/value.converter.schema.registry.url”字段中添加了其 ip,它工作正常。现在我正在尝试使用 Glue Schema Registry。但是我不知道如何将连接器连接到 Glue Schema Registry。
这些类不存在
key.converter=io.confluent.connect.avro.StringConverter
value.converter=io.confluent.connect.avro.StringConverter
StringConverter
类名以 org.apache.kafka
开头
同样,看起来您添加了一堆随机转换器属性,除了 url
之外,这些属性对 String 或 Confluent Avro 转换器无效
要使用 Glue,您需要使用 AWSKafkaAvroConverter
,它是此存储库的一部分,
https://github.com/awslabs/aws-glue-schema-registry/tree/master/avro-kafkaconnect-converter
并在此处记录
我一直在尝试创建一个 MSK 连接器并使用 Glue 模式注册表。 配置如下
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=eu-west-1
topics.dir=topics/dir
flush.size=200
tasks.max=2
s3.part.size=5242880
timezone=GMT
# value.converter.schema.registry.url=http://someIP:8081
key.converter.schemaName=my-topic-schema
locale=US
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
value.converter.schemaName=my-topic-schema
value.converter=io.confluent.connect.avro.StringConverter
s3.bucket.name=my-bucket
key.converter=io.confluent.connect.avro.StringConverter
# key.converter.schema.registry.url==http://someIP:8081
partition.duration.ms=3600000
schema.compatibility=BACKWARD
topics=osb
value.converter.registry.name=Glue-Schema-Registry
key.converter.registry.name=Glue-Schema-Registry
key.converter.schemas.enable=true
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=true
storage.class=io.confluent.connect.s3.storage.S3Storage
rotate.schedule.interval.ms=0
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
timestamp.extractor=RecordField
timestamp.field=timestamp
首先,我在 EC2 上使用融合架构注册表 运行,我在“key/value.converter.schema.registry.url”字段中添加了其 ip,它工作正常。现在我正在尝试使用 Glue Schema Registry。但是我不知道如何将连接器连接到 Glue Schema Registry。
这些类不存在
key.converter=io.confluent.connect.avro.StringConverter
value.converter=io.confluent.connect.avro.StringConverter
StringConverter
类名以 org.apache.kafka
同样,看起来您添加了一堆随机转换器属性,除了 url
之外,这些属性对 String 或 Confluent Avro 转换器无效要使用 Glue,您需要使用 AWSKafkaAvroConverter
,它是此存储库的一部分,
https://github.com/awslabs/aws-glue-schema-registry/tree/master/avro-kafkaconnect-converter
并在此处记录