Spark Confluent 架构注册表客户端 - 无法识别的字段 "schemaType"

Spark Confluent schema registry client - Unrecognized field "schemaType"

我在我的 spark 应用程序中使用 confluent-kafka-schemaregistry 客户端。我的用例是我想从注册表中读取 JSON 模式并在我的应用程序中处理它。

当我注册架构时,我使用了一个参数 schemaTypeJSON 作为值以及 schema 参数。

但是,我在读取 spark 应用程序中的架构时遇到以下异常。

[spark-listener-group-appStatus] ERROR test.JobListener - Batch failed:com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "schemaType" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"])
 at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@6a228f09; line: 1, column: 209] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["schemaType"])
    at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
    at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:855)
    at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1083)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1389)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1343)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:401)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1123)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2856)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)

我的代码是:

val restService = new RestService("http://localhost:8081")
val valueRestResponseSchema = restService.getLatestVersion("jsontesting1")
val jsonSchema = valueRestResponseSchema.getSchema

有人可以帮助如何从这个有效负载中提取 schema

成功了!我使用了 restService.getLatestVersionSchemaOnly("jsontesting1") 并且它只获取了模式! 谢谢!