将 Confluent Schema Registry 与 MSK 结合使用

Using Confluent Schema Registry with MSK

是否可以将 Confluent Schema Registry 与 AWS MSK 集成? 如果您以前这样做过,能否请您提供一些您遵循的指导/博客来实现它?

这是可能的,并且与在常规 Kafka 安装中使用它没有什么不同。

您将其 bootstrap 服务器 属性 指向 MSK,并将客户端应用程序指向它。

有可能。我的设置使用 ec2 和 docker.

  1. 如果您使用基于 IAM 的身份验证,请下载 IAM 身份验证 jar
mkdir -p /usr/share/java/aws
wget -P /usr/share/java/aws https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
chmod -R 444 /usr/share/java/aws
  1. 使用 confluent 官方 docker 图像进行模式注册
...

  schema-registry:
    image: confluentinc/cp-schema-registry:5.4.6-1-ubi8
    hostname: schema-registry
    container_name: schema-registry
    ports:
      - "8081:8081"
    volumes:
      - /usr/share/java/aws/aws-msk-iam-auth-1.1.1-all.jar:/usr/share/java/cp-base-new/aws-msk-iam-auth-1.1.1-all.jar
      - /usr/share/java/aws/aws-msk-iam-auth-1.1.1-all.jar:/usr/share/java/rest-utils/aws-msk-iam-auth-1.1.1-all.jar
    environment: # https://docs.confluent.io/platform/current/schema-registry/installation/config.html#schemaregistry-config
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_HOST_NAME: "${HOSTNAME}" # 
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "${BOOTSTRAP_BROKERS_SASL_IAM}"
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: "SASL_SSL"
      SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: "AWS_MSK_IAM"
      SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: "software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;"
      SCHEMA_REGISTRY_KAFKASTORE_SASL_CLIENT_CALLBACK_HANDLER_CLASS: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"

...
  • HOSTNAME 是您的 ec2 机器 DNS 名称或 IP,例如 ip-10-0-0-84.ec2.internal
  • BOOTSTRAP_BROKERS_SASL_IAM 是逗号分隔的 host1:port,host2:port 网址。对于端口信息see this

如果您使用 PLAINTEXT 或 SSL 身份验证,最后 4 个环境变量会更改。而且你不必下载 iam auth jar

  1. 使用这些配置源或接收器连接器 属性
...
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://ip-10-0-0-84.ec2.internal:8081
value.converter.enhanced.avro.schema.support=true

就是这样。
在您的 EC2 实例的安全组中为 MSK 集群

打开 8081 端口

资源:


我试过的替代选项是AWS Glue Schema registry 但是我们不得不使用 KSQL,而 KSQL 没有第三方架构注册表集成或自定义 SerDe Github issue