Spark Confluent 架构注册表客户端 - 无法识别的字段 "schemaType"
Spark Confluent schema registry client - Unrecognized field "schemaType"
我在我的 spark 应用程序中使用 confluent-kafka-schemaregistry 客户端。我的用例是我想从注册表中读取 JSON 模式并在我的应用程序中处理它。
当我注册架构时,我使用了一个参数 schemaType
和 JSON
作为值以及 schema
参数。
但是,我在读取 spark 应用程序中的架构时遇到以下异常。
[spark-listener-group-appStatus] ERROR test.JobListener - Batch failed:com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "schemaType" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"])
at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@6a228f09; line: 1, column: 209] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["schemaType"])
at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:855)
at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1083)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1389)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1343)
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:401)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1123)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2856)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)
我的代码是:
val restService = new RestService("http://localhost:8081")
val valueRestResponseSchema = restService.getLatestVersion("jsontesting1")
val jsonSchema = valueRestResponseSchema.getSchema
有人可以帮助如何从这个有效负载中提取 schema
。
成功了!我使用了 restService.getLatestVersionSchemaOnly("jsontesting1")
并且它只获取了模式!
谢谢!
我在我的 spark 应用程序中使用 confluent-kafka-schemaregistry 客户端。我的用例是我想从注册表中读取 JSON 模式并在我的应用程序中处理它。
当我注册架构时,我使用了一个参数 schemaType
和 JSON
作为值以及 schema
参数。
但是,我在读取 spark 应用程序中的架构时遇到以下异常。
[spark-listener-group-appStatus] ERROR test.JobListener - Batch failed:com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "schemaType" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"])
at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@6a228f09; line: 1, column: 209] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["schemaType"])
at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:855)
at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1083)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1389)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1343)
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:401)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1123)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2856)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)
我的代码是:
val restService = new RestService("http://localhost:8081")
val valueRestResponseSchema = restService.getLatestVersion("jsontesting1")
val jsonSchema = valueRestResponseSchema.getSchema
有人可以帮助如何从这个有效负载中提取 schema
。
成功了!我使用了 restService.getLatestVersionSchemaOnly("jsontesting1")
并且它只获取了模式!
谢谢!