将 Confluent Schema Registry 与 MSK 结合使用
Using Confluent Schema Registry with MSK
是否可以将 Confluent Schema Registry 与 AWS MSK 集成?
如果您以前这样做过,能否请您提供一些您遵循的指导/博客来实现它?
这是可能的,并且与在常规 Kafka 安装中使用它没有什么不同。
您将其 bootstrap 服务器 属性 指向 MSK,并将客户端应用程序指向它。
有可能。我的设置使用 ec2 和 docker.
- 如果您使用基于 IAM 的身份验证,请下载 IAM 身份验证 jar
mkdir -p /usr/share/java/aws
wget -P /usr/share/java/aws https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
chmod -R 444 /usr/share/java/aws
- 使用 confluent 官方 docker 图像进行模式注册
...
schema-registry:
image: confluentinc/cp-schema-registry:5.4.6-1-ubi8
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
volumes:
- /usr/share/java/aws/aws-msk-iam-auth-1.1.1-all.jar:/usr/share/java/cp-base-new/aws-msk-iam-auth-1.1.1-all.jar
- /usr/share/java/aws/aws-msk-iam-auth-1.1.1-all.jar:/usr/share/java/rest-utils/aws-msk-iam-auth-1.1.1-all.jar
environment: # https://docs.confluent.io/platform/current/schema-registry/installation/config.html#schemaregistry-config
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_HOST_NAME: "${HOSTNAME}" #
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "${BOOTSTRAP_BROKERS_SASL_IAM}"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: "SASL_SSL"
SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: "AWS_MSK_IAM"
SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: "software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;"
SCHEMA_REGISTRY_KAFKASTORE_SASL_CLIENT_CALLBACK_HANDLER_CLASS: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
...
HOSTNAME
是您的 ec2 机器 DNS 名称或 IP,例如 ip-10-0-0-84.ec2.internal
BOOTSTRAP_BROKERS_SASL_IAM
是逗号分隔的 host1:port,host2:port
网址。对于端口信息see this
如果您使用 PLAINTEXT 或 SSL 身份验证,最后 4 个环境变量会更改。而且你不必下载 iam auth jar
- 使用这些配置源或接收器连接器 属性
...
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://ip-10-0-0-84.ec2.internal:8081
value.converter.enhanced.avro.schema.support=true
就是这样。
在您的 EC2 实例的安全组中为 MSK 集群
打开 8081
端口
资源:
- https://github.com/maikelpenz/kafka-aws-deployment
- https://maikelpenz.medium.com/building-a-kafka-playground-on-aws-part-3-streaming-database-events-and-querying-with-ksql-5f23978a0080
我试过的替代选项是AWS Glue Schema registry
但是我们不得不使用 KSQL,而 KSQL 没有第三方架构注册表集成或自定义 SerDe Github issue
是否可以将 Confluent Schema Registry 与 AWS MSK 集成? 如果您以前这样做过,能否请您提供一些您遵循的指导/博客来实现它?
这是可能的,并且与在常规 Kafka 安装中使用它没有什么不同。
您将其 bootstrap 服务器 属性 指向 MSK,并将客户端应用程序指向它。
有可能。我的设置使用 ec2 和 docker.
- 如果您使用基于 IAM 的身份验证,请下载 IAM 身份验证 jar
mkdir -p /usr/share/java/aws
wget -P /usr/share/java/aws https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
chmod -R 444 /usr/share/java/aws
- 使用 confluent 官方 docker 图像进行模式注册
...
schema-registry:
image: confluentinc/cp-schema-registry:5.4.6-1-ubi8
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
volumes:
- /usr/share/java/aws/aws-msk-iam-auth-1.1.1-all.jar:/usr/share/java/cp-base-new/aws-msk-iam-auth-1.1.1-all.jar
- /usr/share/java/aws/aws-msk-iam-auth-1.1.1-all.jar:/usr/share/java/rest-utils/aws-msk-iam-auth-1.1.1-all.jar
environment: # https://docs.confluent.io/platform/current/schema-registry/installation/config.html#schemaregistry-config
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_HOST_NAME: "${HOSTNAME}" #
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "${BOOTSTRAP_BROKERS_SASL_IAM}"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: "SASL_SSL"
SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: "AWS_MSK_IAM"
SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: "software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;"
SCHEMA_REGISTRY_KAFKASTORE_SASL_CLIENT_CALLBACK_HANDLER_CLASS: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
...
HOSTNAME
是您的 ec2 机器 DNS 名称或 IP,例如ip-10-0-0-84.ec2.internal
BOOTSTRAP_BROKERS_SASL_IAM
是逗号分隔的host1:port,host2:port
网址。对于端口信息see this
如果您使用 PLAINTEXT 或 SSL 身份验证,最后 4 个环境变量会更改。而且你不必下载 iam auth jar
- 使用这些配置源或接收器连接器 属性
...
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://ip-10-0-0-84.ec2.internal:8081
value.converter.enhanced.avro.schema.support=true
就是这样。
在您的 EC2 实例的安全组中为 MSK 集群
8081
端口
资源:
- https://github.com/maikelpenz/kafka-aws-deployment
- https://maikelpenz.medium.com/building-a-kafka-playground-on-aws-part-3-streaming-database-events-and-querying-with-ksql-5f23978a0080
我试过的替代选项是AWS Glue Schema registry 但是我们不得不使用 KSQL,而 KSQL 没有第三方架构注册表集成或自定义 SerDe Github issue