JSONConverter 的 Kafka Connect 架构格式

Kafka Connect schema format for JSONConverter

我正在使用 Kafka Connect 从模式注册表中检索现有模式,然后尝试使用 JSONConverter (org.apache.kafka.connect.json.JSONConverter) 转换返回的模式字符串。

不幸的是,我从 JSONConverter 收到一个错误:

org.apache.kafka.connect.errors.DataException: Unknown schema type: object

我查看了 JSONConverter 代码,出现错误是因为从架构注册表返回的架构“类型”是“对象”(见下文),但 JSONConverter 无法识别该类型。

问题:

  1. 检索到的模式是否可用于 JSONConverter?如果是,我是不是用错了?
  2. JSONConverter 需要不同的格式吗?如果是,有人知道 JSONConverter 期望的格式是什么吗?
  3. 是否有不同的方法将架构注册表响应整合到“架构”中?

以下是相关的工件:

模式注册表响应(查询特定模式时):

[{"subject":"test-schema","version":1,"id":1,"schemaType":"JSON","schema":"{\"title\":\"test-schema\",\"type\":\"object\",\"required\":[\"id\"],\"additionalProperties\":false,\"properties\":{\"id\":{\"type\":\"integer\"}}}"}]

稍微清理一下上面的文本后,相关的架构组件(“架构”)如下所示:

{
  "title":"test-schema",
  "type":"object",
  "required":["id"],
  "additionalProperties":false,
  "properties":{"id":{"type":"integer"}}
}

org.apache.kafka.connect.json.JSONConverter 实际上并没有使用“JSONSchema”规范。它有自己的(没有很好记录的)格式。它也根本没有与架构注册表集成。

一个对象是struct类型。 - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas

如果您打算使用实际的 JSONSchema(和注册表),则需要使用 Confluent 的转换器 - io.confluent.connect.json.JsonSchemaConverter

Is there a different method of concerting the schema registry response into a "Schema"

如果您使用 Schema Registry Java 客户端,那么是的,使用 getSchemaById 方法,那么该响应的 schemaType()rawSchema() 方法应该让您接近你想要的。有了它,您可以将它传递给一些 JSONSchema 库(例如 org.everit.json.schema,它由注册表使用)