如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套源数据?

How do you handle nested source data with AVRO serialization in Apache Kafka?

我的目标是从 HTTP 源获取 JSON 数据并使用 AVRO 序列化将其存储在 Kafka 主题中。

使用 Kafka Connect 和 HTTP source connector 以及一堆 SMT,我设法创建了一个 Connect 数据结构,在使用 StringConverter 写入主题时看起来像这样:

Struct{base=stations,cod=200,coord=Struct{lat=54.0,lon=9.0},dt=1632150605}

因此 JSON 已成功解析为 STRUCT,我可以使用 SMT 操作单个元素。接下来,我在 Confluent Schema Registry 中创建了一个具有相应模式的新主题,并使用 "value.converter": "io.confluent.connect.avro.AvroConverter".

将连接器的值转换器切换到 Confluent AVRO 转换器

我收到一条错误消息,而不是预期的序列化:

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault

一旦我使用 ReplaceField 删除了嵌套的 STRUCT 或使用 Flatten 简化了结构,AVRO 序列化就像一个魅力。所以看起来转换器无法处理嵌套结构。

当您有嵌套元素并希望它们被序列化而不是将 JSON 存储为 String 并尝试处理消费者或其他对象的对象创建时,正确的方法是什么?这在 Kafka Connect 中可能吗?

从 JSON 字符串创建 STRUCT 元素可以通过不同的方式实现。最初,SMT ExpandJson 因其简单性而被使用。但是,它不会创建足够命名的 STRUCT,因为它没有可以处理的模式。这就是导致初始错误消息的原因,因为 AVRO 序列化程序对这些结构使用通用的 class io.confluent.connect.avro.ConnectDefault,如果存在多个结构,就会出现歧义,从而引发异常。

另一个看起来做同样事情的 SMT 是 Json Schema,它有一个记录的 FromJson 转换。它确实接受一个模式,从而解决了 ExpandJson 将嵌套元素解析为通用类型的问题。不过,接受的是 JSON 架构,并且通过将单词“properties”作为命名空间并复制字段名称来映射到 AVRO 全名。在此示例中,您将以 properties.coord 作为内部元素的全名。

例如,当以下 JSON 架构传递给 SMT 时:

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    "coord": {
      "type": "object",
      "properties": {
        "lon": {
          "type": "number"
        },
        "lat": {
          "type": "number"
        }
      },
      "required": [
        "lon",
        "lat"
      ]
    },
    ...
}

它生成的 AVRO 模式(因此在模式注册表中查找)是:

{
    "type": "record",
    "fields": [
        ...
        {
            "name": "coord",
            "type": {
                "type": "record",
                "name": "coord",
                "namespace": "properties",
                "fields": [
                    {
                        "name": "lat",
                        "type": "double"
                    },
                    {
                        "name": "lon",
                        "type": "double"
                    }
                ],
                "connect.name": "properties.coord"
            }
        },
    ...
}

理论上,如果您在第二层有另一个带有 coord 元素的模式,它将获得相同的全名,但由于这些不是需要引用的模式注册表中的单独条目,这不会导致碰撞。无法从 JSON 模式控制 AVRO 记录的命名空间有点遗憾,因为感觉就像你就在那里,但我还没有能够深入挖掘提供解决方案。

建议的 SMT SetSchemaMetadata(请参阅问题的第一个答复)在此过程中很有用,但 it's documentation 与 AVRO 命名约定有点冲突,因为它在示例中显示 order-value。它将尝试查找包含以该名称作为根元素的 AVRO 记录的模式,并且由于“-”是 AVRO 名称中的非法字符,您会收到错误消息。但是,如果您使用根元素的正确名称,SMT 会做一些非常有用的事情:它的 RestService class 查询模式注册表以查找匹配的模式,失败并打印一条消息需要创建的确切模式定义,因此您不必记住所有转换规则。

所以原问题的答案是:是的,可以用Kafka Connect来完成。如果您

,这也是最好的选择
  • 不想自己写producer/connector
  • 想要以类型化的方式存储 JSON blob,而不是在它们到达初始主题后转换它们

如果可以选择在数据摄取后进行转换,de-, re- and serialization capabilities of ksqlDB 似乎非常强大。