Druid Kafka 索引服务使用 avro 模式错误(AvroTypeException:字段默认值无效)

Druid Kafka indexing service consume avro schema error(AvroTypeException: Invalid default for field)

Apache Druid kafka 索引服务使用 avro_stream 作为解析器类型,avroBytesDecoder 设置类型 schema_registry 和 url 模式注册表 url。 消费kafka数据,logParseExceptions为true时,得到如下日志:

2021-01-06T02:48:33,022 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered parse exception on row from partition[10] sequenceNumber[101782492]
org.apache.druid.java.util.common.parsers.ParseException: Fail to decode avro message!
  at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:71) ~[?:?]
  at org.apache.druid.data.input.AvroStreamInputRowParser.parseBatch(AvroStreamInputRowParser.java:58) ~[?:?]
  at org.apache.druid.data.input.AvroStreamInputRowParser.parseBatch(AvroStreamInputRowParser.java:36) ~[?:?]
  at org.apache.druid.segment.transform.TransformingInputRowParser.parseBatch(TransformingInputRowParser.java:50) ~[druid-processing-0.16.0-incubating.jar:0.16.0-incubating]
  at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:604) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]
  at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:259) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]
  at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:177) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]
  at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]
  at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_252]
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.avro.AvroTypeException: Invalid default for field campaign_type: 1 not a ["null","int"]
  at org.apache.avro.Schema.validateDefault(Schema.java:1512) ~[?:?]
  at org.apache.avro.Schema.access0(Schema.java:86) ~[?:?]
  at org.apache.avro.Schema$Field.<init>(Schema.java:493) ~[?:?]
  at org.apache.avro.Schema.parse(Schema.java:1619) ~[?:?]
  at org.apache.avro.Schema$Parser.parse(Schema.java:1366) ~[?:?]
  at org.apache.avro.Schema$Parser.parse(Schema.java:1354) ~[?:?]
  at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:63) ~[?:?]
  at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:117) ~[?:?]
  at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getByID(CachedSchemaRegistryClient.java:99) ~[?:?]
  at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:66) ~[?:?]
  ... 12 more

这是我关于该字段的 avro 文件:

{ "name": "campaign_type", "type": ["null", "int"], "default": 1}

我认为您需要翻转类型排序。 Druid 代码中有一项检查表明默认值必须与联合中列出的第一种类型相匹配。

{ "name": "campaign_type", "type": ["int", "null"], "default": 1}

编辑: 根据德鲁伊文档,德鲁伊不支持 ["null", <other type>] 以外的联合类型。因此,可能无法通过架构获得非空默认值。不过,您或许可以通过摄取规范来做到这一点。

参考:https://druid.apache.org/docs/latest/development/extensions-core/avro.html#avro-types