尝试将 JSON 转换为 AVRO 时没有类型错误
No Type Error when trying to transform JSON to AVRO
我正在尝试将 JSON 有效负载转换为 Avro 以发布到 Kafka 主题。但是,当我进行 Dataweave 转换时,出现“无类型”错误。我不确定是什么导致了错误。我最初认为这可能是由于转换不知道入站负载的 MIME 类型。因此,我已确保将其设置为 application/json
但这没有任何区别。
Avro 架构
{
"compatibility" : "forward",
"name": "ContentManagerCoupons",
"type": "record",
"namespace": "com.rentpath",
"fields": [
{
"name": "clientID",
"type": "string"
},
{
"name": "outputHistoryId",
"type": "string"
},
{
"name": "categoryCoupons",
"type": {
"type": "array",
"items": {
"name": "categoryCoupons_record",
"type": "record",
"fields": [
{
"name": "applyBy",
"type": [
"string",
"int",
"null"
]
},
{
"name": "applyPeriod",
"type": [
"string",
"null"
]
},
{
"name": "cashValue",
"type": [
"int",
"null"
]
},
{
"name": "couponCategory",
"type": "string"
},
{
"name": "cashOffDesc",
"type": [
"string",
"null"
]
},
{
"name": "endDate",
"type": [
"string",
"null"
]
},
{
"name": "feeType",
"type": [
"string",
"null"
]
},
{
"name": "freeWeeks",
"type": [
"string",
"null"
]
},
{
"name": "generatedText",
"type": "string"
},
{
"name": "leaseby",
"type": [
"string",
"null"
]
},
{
"name": "leaseTerm",
"type": [
"int",
"null"
]
},
{
"name": "offerText",
"type": [
"string",
"null"
]
},
{
"name": "startDate",
"type": "string"
},
{
"name": "unitType",
"type": [
"string",
"null"
]
}
]
}
}
}
]
}
JSON留言
{
"outputHistoryId": "55324456",
"clientID": "112345",
"categoryCoupons": [
{
"unitType": null,
"startDate": "07/21/2020",
"offerText": "This would be the special offer message.",
"leaseTerm": null,
"leaseby": null,
"generatedText": "This would be the special offer message..",
"freeWeeks": null,
"feeType": null,
"endDate": "10/01/2020",
"couponCategory": "Special Offer",
"cashValue": null,
"cashOffDesc": null,
"applyPeriod": null,
"applyBy": null
}
]
}
数据波
%dw 2.2
output application/avro schemaUrl="http://schema-registry.domain.com:8081/subjects/Coupon-value/versions/1"
---
payload
错误信息
"org.apache.avro.SchemaParseException - No type: {"subject":"ContentManager.Coupon-value","version":1,"id":342,"schema":"{"type":"record","name":"ContentManagerCoupons","namespace":"com.rentpath","fields":[{"name":"clientID","type":"string"},{"name":"outputHistoryId","type":"string"},{"name":"categoryCoupons","type":{"type":"array","items":{"type":"record","name":"categoryCoupons_record","fields":[{"name":"applyBy","type":["string","int","null"]},{"name":"applyPeriod","type":["string","null"]},{"name":"cashValue","type":["int","null"]},{"name":"couponCategory","type":"string"},{"name":"cashOffDesc","type":["string","null"]},{"name":"endDate","type":["string","null"]},{"name":"feeType","type":["string","null"]},{"name":"freeWeeks","type":["string","null"]},{"name":"generatedText","type":"string"},{"name":"leaseby","type":["string","null"]},{"name":"leaseTerm","type":["int","null"]},{"name":"offerText","type":["string","null"]},{"name":"startDate","type":"string"},{"name":"unitType","type":["string","null"]}]}}}],"compatibility":"forward"}"}
org.apache.avro.SchemaParseException: No type: {"subject":"ContentManager.Coupon-value","version":1,"id":342,"schema":"{"type":"record","name":"ContentManagerCoupons","namespace":"com.rentpath","fields":[{"name":"clientID","type":"string"},{"name":"outputHistoryId","type":"string"},{"name":"categoryCoupons","type":{"type":"array","items":{"type":"record","name":"categoryCoupons_record","fields":[{"name":"applyBy","type":["string","int","null"]},{"name":"applyPeriod","type":["string","null"]},{"name":"cashValue","type":["int","null"]},{"name":"couponCategory","type":"string"},{"name":"cashOffDesc","type":["string","null"]},{"name":"endDate","type":["string","null"]},{"name":"feeType","type":["string","null"]},{"name":"freeWeeks","type":["string","null"]},{"name":"generatedText","type":"string"},{"name":"leaseby","type":["string","null"]},{"name":"leaseTerm","type":["int","null"]},{"name":"offerText","type":["string","null"]},{"name":"startDate","type":"string"},{"name":"unitType","type":["string","null"]}]}}}],"compatibility":"forward"}"}
at org.apache.avro.Schema.getRequiredText(Schema.java:1753)
at org.apache.avro.Schema.parse(Schema.java:1604)
at org.apache.avro.Schema$Parser.parse(Schema.java:1394)
at org.apache.avro.Schema$Parser.parse(Schema.java:1365)
at org.mule.weave.v2.module.avro.AvroWriter.doWriteValue(AvroWriter.scala:195)
at org.mule.weave.v2.module.writer.Writer.writeValue(Writer.scala:41)
at org.mule.weave.v2.module.writer.Writer.writeValue$(Writer.scala:39)
at org.mule.weave.v2.module.avro.AvroWriter.writeValue(AvroWriter.scala:44)
at org.mule.weave.v2.module.writer.DeferredWriter.doWriteValue(DeferredWriter.scala:73)
at org.mule.weave.v2.module.writer.Writer.writeValue(Writer.scala:41)
at org.mule.weave.v2.module.writer.Writer.writeValue$(Writer.scala:39)
at org.mule.weave.v2.module.writer.DeferredWriter.writeValue(DeferredWriter.scala:16)
at org.mule.weave.v2.module.writer.WriterHelper$.writeValue(Writer.scala:120)
at org.mule.weave.v2.module.writer.WriterHelper$.writeAndGetResult(Writer.scala:98)
at org.mule.weave.v2.interpreted.InterpretedMappingExecutableWeave.write(InterpreterMappingCompilerPhase.scala:236)
at org.mule.weave.v2.el.WeaveExpressionLanguageSession.evaluateWithTimeout(WeaveExpressionLanguageSession.scala:243)
at org.mule.weave.v2.el.WeaveExpressionLanguageSession.evaluate(WeaveExpressionLanguageSession.scala:108)
at org.mule.runtime.core.internal.el.dataweave.DataWeaveExpressionLanguageAdaptor.evaluate(DataWeaveExpressionLanguageAdaptor.java:308)
at org.mule.runtime.core.internal.el.DefaultExpressionManagerSession.evaluate(DefaultExpressionManagerSession.java:105)
at com.mulesoft.mule.runtime.core.internal.processor.SetPayloadTransformationTarget.process(SetPayloadTransformationTarget.java:32)
at com.mulesoft.mule.runtime.core.internal.processor.TransformMessageProcessor.lambda[=15=](TransformMessageProcessor.java:92)
at java.util.Optional.ifPresent(Optional.java:159)
at com.mulesoft.mule.runtime.core.internal.processor.TransformMessageProcessor.process(TransformMessageProcessor.java:92)
at org.mule.runtime.core.api.util.func.CheckedFunction.apply(CheckedFunction.java:25)
at org.mule.runtime.core.api.rx.Exceptions.lambda$checkedFunction(Exceptions.java:84)
at org.mule.runtime.core.internal.util.rx.Operators.lambda$nullSafeMap[=15=](Operators.java:47)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:165)
at org.mule.runtime.core.privileged.processor.chain.AbstractMessageProcessorChain.onNext(AbstractMessageProcessorChain.java:425)
at org.mule.runtime.core.privileged.processor.chain.AbstractMessageProcessorChain.onNext(AbstractMessageProcessorChain.java:420)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:127)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:345)
at reactor.core.publisher.FluxSubscribeOnValue$ScheduledScalar.run(FluxSubscribeOnValue.java:178)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.mule.service.scheduler.internal.AbstractRunnableFutureDecorator.doRun(AbstractRunnableFutureDecorator.java:111)
at org.mule.service.scheduler.internal.RunnableFutureDecorator.run(RunnableFutureDecorator.java:54)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748), while writing Avro at payload.
这似乎对我有用。尝试访问架构时可能出现问题。因为我无权访问那个 URL 我用本地文件替换了它:
output application/avro schemaUrl="classpath://schema.json"
显然答案很简单。我只需要将 schema
添加到 URL 的末尾。这将删除没有它的版本和 id 等无关项。
新数据编织
%dw 2.2
output application/avro schemaUrl="http://schema-registry.domain.com:8081/subjects/Coupon-value/versions/1/schema"
---
payload
我正在尝试将 JSON 有效负载转换为 Avro 以发布到 Kafka 主题。但是,当我进行 Dataweave 转换时,出现“无类型”错误。我不确定是什么导致了错误。我最初认为这可能是由于转换不知道入站负载的 MIME 类型。因此,我已确保将其设置为 application/json
但这没有任何区别。
Avro 架构
{
"compatibility" : "forward",
"name": "ContentManagerCoupons",
"type": "record",
"namespace": "com.rentpath",
"fields": [
{
"name": "clientID",
"type": "string"
},
{
"name": "outputHistoryId",
"type": "string"
},
{
"name": "categoryCoupons",
"type": {
"type": "array",
"items": {
"name": "categoryCoupons_record",
"type": "record",
"fields": [
{
"name": "applyBy",
"type": [
"string",
"int",
"null"
]
},
{
"name": "applyPeriod",
"type": [
"string",
"null"
]
},
{
"name": "cashValue",
"type": [
"int",
"null"
]
},
{
"name": "couponCategory",
"type": "string"
},
{
"name": "cashOffDesc",
"type": [
"string",
"null"
]
},
{
"name": "endDate",
"type": [
"string",
"null"
]
},
{
"name": "feeType",
"type": [
"string",
"null"
]
},
{
"name": "freeWeeks",
"type": [
"string",
"null"
]
},
{
"name": "generatedText",
"type": "string"
},
{
"name": "leaseby",
"type": [
"string",
"null"
]
},
{
"name": "leaseTerm",
"type": [
"int",
"null"
]
},
{
"name": "offerText",
"type": [
"string",
"null"
]
},
{
"name": "startDate",
"type": "string"
},
{
"name": "unitType",
"type": [
"string",
"null"
]
}
]
}
}
}
]
}
JSON留言
{
"outputHistoryId": "55324456",
"clientID": "112345",
"categoryCoupons": [
{
"unitType": null,
"startDate": "07/21/2020",
"offerText": "This would be the special offer message.",
"leaseTerm": null,
"leaseby": null,
"generatedText": "This would be the special offer message..",
"freeWeeks": null,
"feeType": null,
"endDate": "10/01/2020",
"couponCategory": "Special Offer",
"cashValue": null,
"cashOffDesc": null,
"applyPeriod": null,
"applyBy": null
}
]
}
数据波
%dw 2.2
output application/avro schemaUrl="http://schema-registry.domain.com:8081/subjects/Coupon-value/versions/1"
---
payload
错误信息
"org.apache.avro.SchemaParseException - No type: {"subject":"ContentManager.Coupon-value","version":1,"id":342,"schema":"{"type":"record","name":"ContentManagerCoupons","namespace":"com.rentpath","fields":[{"name":"clientID","type":"string"},{"name":"outputHistoryId","type":"string"},{"name":"categoryCoupons","type":{"type":"array","items":{"type":"record","name":"categoryCoupons_record","fields":[{"name":"applyBy","type":["string","int","null"]},{"name":"applyPeriod","type":["string","null"]},{"name":"cashValue","type":["int","null"]},{"name":"couponCategory","type":"string"},{"name":"cashOffDesc","type":["string","null"]},{"name":"endDate","type":["string","null"]},{"name":"feeType","type":["string","null"]},{"name":"freeWeeks","type":["string","null"]},{"name":"generatedText","type":"string"},{"name":"leaseby","type":["string","null"]},{"name":"leaseTerm","type":["int","null"]},{"name":"offerText","type":["string","null"]},{"name":"startDate","type":"string"},{"name":"unitType","type":["string","null"]}]}}}],"compatibility":"forward"}"} org.apache.avro.SchemaParseException: No type: {"subject":"ContentManager.Coupon-value","version":1,"id":342,"schema":"{"type":"record","name":"ContentManagerCoupons","namespace":"com.rentpath","fields":[{"name":"clientID","type":"string"},{"name":"outputHistoryId","type":"string"},{"name":"categoryCoupons","type":{"type":"array","items":{"type":"record","name":"categoryCoupons_record","fields":[{"name":"applyBy","type":["string","int","null"]},{"name":"applyPeriod","type":["string","null"]},{"name":"cashValue","type":["int","null"]},{"name":"couponCategory","type":"string"},{"name":"cashOffDesc","type":["string","null"]},{"name":"endDate","type":["string","null"]},{"name":"feeType","type":["string","null"]},{"name":"freeWeeks","type":["string","null"]},{"name":"generatedText","type":"string"},{"name":"leaseby","type":["string","null"]},{"name":"leaseTerm","type":["int","null"]},{"name":"offerText","type":["string","null"]},{"name":"startDate","type":"string"},{"name":"unitType","type":["string","null"]}]}}}],"compatibility":"forward"}"} at org.apache.avro.Schema.getRequiredText(Schema.java:1753) at org.apache.avro.Schema.parse(Schema.java:1604) at org.apache.avro.Schema$Parser.parse(Schema.java:1394) at org.apache.avro.Schema$Parser.parse(Schema.java:1365) at org.mule.weave.v2.module.avro.AvroWriter.doWriteValue(AvroWriter.scala:195) at org.mule.weave.v2.module.writer.Writer.writeValue(Writer.scala:41) at org.mule.weave.v2.module.writer.Writer.writeValue$(Writer.scala:39) at org.mule.weave.v2.module.avro.AvroWriter.writeValue(AvroWriter.scala:44) at org.mule.weave.v2.module.writer.DeferredWriter.doWriteValue(DeferredWriter.scala:73) at org.mule.weave.v2.module.writer.Writer.writeValue(Writer.scala:41) at org.mule.weave.v2.module.writer.Writer.writeValue$(Writer.scala:39) at org.mule.weave.v2.module.writer.DeferredWriter.writeValue(DeferredWriter.scala:16) at org.mule.weave.v2.module.writer.WriterHelper$.writeValue(Writer.scala:120) at org.mule.weave.v2.module.writer.WriterHelper$.writeAndGetResult(Writer.scala:98) at org.mule.weave.v2.interpreted.InterpretedMappingExecutableWeave.write(InterpreterMappingCompilerPhase.scala:236) at org.mule.weave.v2.el.WeaveExpressionLanguageSession.evaluateWithTimeout(WeaveExpressionLanguageSession.scala:243) at org.mule.weave.v2.el.WeaveExpressionLanguageSession.evaluate(WeaveExpressionLanguageSession.scala:108) at org.mule.runtime.core.internal.el.dataweave.DataWeaveExpressionLanguageAdaptor.evaluate(DataWeaveExpressionLanguageAdaptor.java:308) at org.mule.runtime.core.internal.el.DefaultExpressionManagerSession.evaluate(DefaultExpressionManagerSession.java:105) at com.mulesoft.mule.runtime.core.internal.processor.SetPayloadTransformationTarget.process(SetPayloadTransformationTarget.java:32) at com.mulesoft.mule.runtime.core.internal.processor.TransformMessageProcessor.lambda[=15=](TransformMessageProcessor.java:92) at java.util.Optional.ifPresent(Optional.java:159) at com.mulesoft.mule.runtime.core.internal.processor.TransformMessageProcessor.process(TransformMessageProcessor.java:92) at org.mule.runtime.core.api.util.func.CheckedFunction.apply(CheckedFunction.java:25) at org.mule.runtime.core.api.rx.Exceptions.lambda$checkedFunction(Exceptions.java:84) at org.mule.runtime.core.internal.util.rx.Operators.lambda$nullSafeMap[=15=](Operators.java:47) at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:165) at org.mule.runtime.core.privileged.processor.chain.AbstractMessageProcessorChain.onNext(AbstractMessageProcessorChain.java:425) at org.mule.runtime.core.privileged.processor.chain.AbstractMessageProcessorChain.onNext(AbstractMessageProcessorChain.java:420) at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:127) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:345) at reactor.core.publisher.FluxSubscribeOnValue$ScheduledScalar.run(FluxSubscribeOnValue.java:178) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.mule.service.scheduler.internal.AbstractRunnableFutureDecorator.doRun(AbstractRunnableFutureDecorator.java:111) at org.mule.service.scheduler.internal.RunnableFutureDecorator.run(RunnableFutureDecorator.java:54) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748), while writing Avro at payload.
这似乎对我有用。尝试访问架构时可能出现问题。因为我无权访问那个 URL 我用本地文件替换了它:
output application/avro schemaUrl="classpath://schema.json"
显然答案很简单。我只需要将 schema
添加到 URL 的末尾。这将删除没有它的版本和 id 等无关项。
新数据编织
%dw 2.2
output application/avro schemaUrl="http://schema-registry.domain.com:8081/subjects/Coupon-value/versions/1/schema"
---
payload