尝试将 JSON 转换为 AVRO 时没有类型错误

No Type Error when trying to transform JSON to AVRO

我正在尝试将 JSON 有效负载转换为 Avro 以发布到 Kafka 主题。但是,当我进行 Dataweave 转换时,出现“无类型”错误。我不确定是什么导致了错误。我最初认为这可能是由于转换不知道入站负载的 MIME 类型。因此,我已确保将其设置为 application/json 但这没有任何区别。

Avro 架构

{
  "compatibility" : "forward",
  "name": "ContentManagerCoupons",
  "type": "record",
  "namespace": "com.rentpath",
  "fields": [
    {
      "name": "clientID",
      "type": "string"
    },
{
  "name": "outputHistoryId",
  "type": "string"
},
{
  "name": "categoryCoupons",
  "type": {
    "type": "array",
    "items": {
      "name": "categoryCoupons_record",
      "type": "record",
      "fields": [
        {
          "name": "applyBy",
          "type": [
            "string",
            "int",
            "null"
          ]
        },
        {
          "name": "applyPeriod",
          "type": [
            "string",
            "null"
          ]
        },
        {
          "name": "cashValue",
          "type": [
            "int",
            "null"
          ]
        },
        {
          "name": "couponCategory",
          "type": "string"
        },
        {
          "name": "cashOffDesc",
          "type": [
            "string",
            "null"
          ]
        },
        {
          "name": "endDate",
          "type": [
            "string",
            "null"
          ]
        },
        {
          "name": "feeType",
          "type": [
            "string",
            "null"
          ]
        },
        {
          "name": "freeWeeks",
          "type": [
            "string",
            "null"
          ]
        },
        {
          "name": "generatedText",
          "type": "string"
        },
        {
          "name": "leaseby",
          "type": [
            "string",
            "null"
          ]
        },
        {
          "name": "leaseTerm",
          "type": [
            "int",
            "null"
          ]
        },
        {
          "name": "offerText",
          "type": [
            "string",
            "null"
          ]
        },
        {
          "name": "startDate",
          "type": "string"
        },
        {
          "name": "unitType",
          "type": [
            "string",
            "null"
          ]
        }
      ]
    }
  }
}
]
}

JSON留言

{
"outputHistoryId": "55324456",
"clientID": "112345",
"categoryCoupons": [
    {
        "unitType": null,
        "startDate": "07/21/2020",
        "offerText": "This would be the special offer message.",
        "leaseTerm": null,
        "leaseby": null,
        "generatedText": "This would be the special offer message..",
        "freeWeeks": null,
        "feeType": null,
        "endDate": "10/01/2020",
        "couponCategory": "Special Offer",
        "cashValue": null,
        "cashOffDesc": null,
        "applyPeriod": null,
        "applyBy": null
    }
]
}

数据波

%dw 2.2
output application/avro schemaUrl="http://schema-registry.domain.com:8081/subjects/Coupon-value/versions/1"
---
payload

错误信息

"org.apache.avro.SchemaParseException - No type: {"subject":"ContentManager.Coupon-value","version":1,"id":342,"schema":"{"type":"record","name":"ContentManagerCoupons","namespace":"com.rentpath","fields":[{"name":"clientID","type":"string"},{"name":"outputHistoryId","type":"string"},{"name":"categoryCoupons","type":{"type":"array","items":{"type":"record","name":"categoryCoupons_record","fields":[{"name":"applyBy","type":["string","int","null"]},{"name":"applyPeriod","type":["string","null"]},{"name":"cashValue","type":["int","null"]},{"name":"couponCategory","type":"string"},{"name":"cashOffDesc","type":["string","null"]},{"name":"endDate","type":["string","null"]},{"name":"feeType","type":["string","null"]},{"name":"freeWeeks","type":["string","null"]},{"name":"generatedText","type":"string"},{"name":"leaseby","type":["string","null"]},{"name":"leaseTerm","type":["int","null"]},{"name":"offerText","type":["string","null"]},{"name":"startDate","type":"string"},{"name":"unitType","type":["string","null"]}]}}}],"compatibility":"forward"}"} org.apache.avro.SchemaParseException: No type: {"subject":"ContentManager.Coupon-value","version":1,"id":342,"schema":"{"type":"record","name":"ContentManagerCoupons","namespace":"com.rentpath","fields":[{"name":"clientID","type":"string"},{"name":"outputHistoryId","type":"string"},{"name":"categoryCoupons","type":{"type":"array","items":{"type":"record","name":"categoryCoupons_record","fields":[{"name":"applyBy","type":["string","int","null"]},{"name":"applyPeriod","type":["string","null"]},{"name":"cashValue","type":["int","null"]},{"name":"couponCategory","type":"string"},{"name":"cashOffDesc","type":["string","null"]},{"name":"endDate","type":["string","null"]},{"name":"feeType","type":["string","null"]},{"name":"freeWeeks","type":["string","null"]},{"name":"generatedText","type":"string"},{"name":"leaseby","type":["string","null"]},{"name":"leaseTerm","type":["int","null"]},{"name":"offerText","type":["string","null"]},{"name":"startDate","type":"string"},{"name":"unitType","type":["string","null"]}]}}}],"compatibility":"forward"}"} at org.apache.avro.Schema.getRequiredText(Schema.java:1753) at org.apache.avro.Schema.parse(Schema.java:1604) at org.apache.avro.Schema$Parser.parse(Schema.java:1394) at org.apache.avro.Schema$Parser.parse(Schema.java:1365) at org.mule.weave.v2.module.avro.AvroWriter.doWriteValue(AvroWriter.scala:195) at org.mule.weave.v2.module.writer.Writer.writeValue(Writer.scala:41) at org.mule.weave.v2.module.writer.Writer.writeValue$(Writer.scala:39) at org.mule.weave.v2.module.avro.AvroWriter.writeValue(AvroWriter.scala:44) at org.mule.weave.v2.module.writer.DeferredWriter.doWriteValue(DeferredWriter.scala:73) at org.mule.weave.v2.module.writer.Writer.writeValue(Writer.scala:41) at org.mule.weave.v2.module.writer.Writer.writeValue$(Writer.scala:39) at org.mule.weave.v2.module.writer.DeferredWriter.writeValue(DeferredWriter.scala:16) at org.mule.weave.v2.module.writer.WriterHelper$.writeValue(Writer.scala:120) at org.mule.weave.v2.module.writer.WriterHelper$.writeAndGetResult(Writer.scala:98) at org.mule.weave.v2.interpreted.InterpretedMappingExecutableWeave.write(InterpreterMappingCompilerPhase.scala:236) at org.mule.weave.v2.el.WeaveExpressionLanguageSession.evaluateWithTimeout(WeaveExpressionLanguageSession.scala:243) at org.mule.weave.v2.el.WeaveExpressionLanguageSession.evaluate(WeaveExpressionLanguageSession.scala:108) at org.mule.runtime.core.internal.el.dataweave.DataWeaveExpressionLanguageAdaptor.evaluate(DataWeaveExpressionLanguageAdaptor.java:308) at org.mule.runtime.core.internal.el.DefaultExpressionManagerSession.evaluate(DefaultExpressionManagerSession.java:105) at com.mulesoft.mule.runtime.core.internal.processor.SetPayloadTransformationTarget.process(SetPayloadTransformationTarget.java:32) at com.mulesoft.mule.runtime.core.internal.processor.TransformMessageProcessor.lambda[=15=](TransformMessageProcessor.java:92) at java.util.Optional.ifPresent(Optional.java:159) at com.mulesoft.mule.runtime.core.internal.processor.TransformMessageProcessor.process(TransformMessageProcessor.java:92) at org.mule.runtime.core.api.util.func.CheckedFunction.apply(CheckedFunction.java:25) at org.mule.runtime.core.api.rx.Exceptions.lambda$checkedFunction(Exceptions.java:84) at org.mule.runtime.core.internal.util.rx.Operators.lambda$nullSafeMap[=15=](Operators.java:47) at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:165) at org.mule.runtime.core.privileged.processor.chain.AbstractMessageProcessorChain.onNext(AbstractMessageProcessorChain.java:425) at org.mule.runtime.core.privileged.processor.chain.AbstractMessageProcessorChain.onNext(AbstractMessageProcessorChain.java:420) at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:127) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:345) at reactor.core.publisher.FluxSubscribeOnValue$ScheduledScalar.run(FluxSubscribeOnValue.java:178) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.mule.service.scheduler.internal.AbstractRunnableFutureDecorator.doRun(AbstractRunnableFutureDecorator.java:111) at org.mule.service.scheduler.internal.RunnableFutureDecorator.run(RunnableFutureDecorator.java:54) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748), while writing Avro at payload.

这似乎对我有用。尝试访问架构时可能出现问题。因为我无权访问那个 URL 我用本地文件替换了它:

output application/avro schemaUrl="classpath://schema.json"

显然答案很简单。我只需要将 schema 添加到 URL 的末尾。这将删除没有它的版本和 id 等无关项。

新数据编织

%dw 2.2
output application/avro schemaUrl="http://schema-registry.domain.com:8081/subjects/Coupon-value/versions/1/schema"
---
payload