在 Confluent conn 失败时通过 HTTPS 进行 Kafka Schema Registry

Kafka Schema Registry via HTTPS on Confluent conn failure

由于身份验证问题,我无法在架构注册表中注册 Avro 架构。

我已经设置了一个 Confluent Cloud 集群并通过 UI 为主题定义了一个 avro 模式。我还通过 UI.

设置了 Api 键

我已经确认我可以使用以下 curl 查询 subjects - curl -u keyid:secretkey https://schema-reg-url/subjects。 所以我用的API键应该没问题。

我也尝试过设置具有正确属性的 RestService(如下),但我似乎仍然无法连接到架构注册表。

我查看了 SchemaRegistryClient 的源代码,但似乎没有指定身份验证参数的选项。

我走错路了吗?

注意:我指定了以下属性,因为这些是 Confluent API 访问中建议的内容页。

val rs1: RestService = new RestService("<https://schema-registry-url>")
val props = new util.HashMap[String, String]()
props.put("basic.auth.credentials.source", "USER_INFO")
props.put("schema.registry.basic.auth.user.info", "key_id:secret_key_id")
props.put("schema.registry.url", "https://schema-registry-url")

// this fails
rs1.registerSchema(props, RegisterSchemaRequest.fromJson(schemaString), "<subject name>")

// this fails as well
val listOfSubjects: util.List[String] = rs1.getAllSubjects(props)

我得到的错误如下。

Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@7507d96c; line: 1, column: 2]; error code: 50005
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@7507d96c; line: 1, column: 2]; error code: 50005

更新:我做了进一步的分析。
上面的错误不是实际错误 - 上面的错误发生是因为 jsonDeserializer.readValue() 的调用失败,因为 Exception object 没有传递给它(参考: line in source code

实际错误是 401 HTTP_UNAUTHORIZED 错误。

与架构注册表的连接使用基本身份验证进行授权。 REST GET 调用需要在 header 中包含编码的 API Key:Pwd。

下面答案中的工作代码片段。

我从 Confluent 支持那里得到了一些帮助 - Schema Registry 使用 Basic Auth 进行授权,API Keys+Pwd 需要在 Header 中作为 "Authorization":"Basic base64encoded(api-key-username:pwd)"

我想我需要根据 authorization/authentication 惯例来学习我的知识。

工作代码片段对我有用。

val rs1: RestService = new RestService(s"${testKafkaSchemaRegistryURL}")
val headers = new util.HashMap[String, String]()
  headers.put("Authorization","Basic " + util.Base64.getEncoder().encodeToString(s"${testKafkaSchemaRegistryAccessKey}:${testKafkaSchemaRegistrySecretAccessKey}".getBytes()))

// works now
val allSubjects = rs1.getAllSubjects(headers)

// works now
val schConfig: Schema = rs1.getLatestVersion(headers, "kev-test-1-value")
schContent.toString().parseJson