来自具有异构 JSON 结构的主题的 KSQL 流

KSQL stream from topic with heterogeneous JSON structures

有没有一种方法可以从指定整个记录应被视为 VARCHAR 的主题创建流,以便我可以使用 extractjsonfield() 从中创建流?示例记录可能类似于:

{
  "Header": {
    "RecType": "RecA",
    ... more header records in a fairly consistent format ...
  },
  "RAFld1": {
    "someFld": "some data",
    "someOtherField": 1.001,
  },
  "RAFld2": {
    "aFld": "data",
    "anotherFld": 98.6,
    ...
  },
  ...
}

但下一条记录可能如下所示:

{
  "Header": {
    "RecType": "RecB",
    ... more header records in a fairly consistent format ...
  },
  "RBFld1": {
    "randomFld": "random data",
    "randomOtherField": 1.001,
    ...
  }
}

我可以弄清楚如何将具有已知字段的初始流定义为 VARCHAR 类型,然后使用 extractjsonfield()(使用适当的 where 子句),但看不出有什么方法可以说明顶级结构不具有一致命名的字段。

这是我输入主题的格式;我无法更改该格式。我希望 KSQL 将成为一个优雅的解决方案,但我似乎从一开始就因为无法处理这种动态结构而陷入困境。

如果您在架构中命名的字段没有出现在每条消息中,那也没关系;你只会得到 null 个值。

我认为您的问题很有趣,并且已经写了一篇关于 KSQL 如何在此处工作的解释 - 如果您想用它做其他事情,请告诉我,我可以扩展答案。


  1. 查看原始数据:

    ksql> PRINT 'source_data' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
    {"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
    
  2. 注册 source_data 主题以用作名为 my_stream 的 KSQL 流:

    CREATE STREAM my_stream (Header VARCHAR, \
                             RAFld1 VARCHAR, \
                             RAFld2 VARCHAR, \
                             RBFld1 VARCHAR) \
    WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
    
  3. 检查消息。请注意,在第二条消息(记录类型 "B")中,'RAFld1' 没有值,因此显示 null

    ksql> SELECT Header, RAFld1 FROM my_stream LIMIT 2;
    {"RecType":"RecA"} | {"someOtherField":1.001,"someFld":"some data"}
    {"RecType":"RecB"} | null
    
  4. 仅使用记录类型 "A" 值填充新的 Kafka 主题,使用 EXTRACTFROMJSON 过滤 Header 值上的记录类型,并从有效负载中提取命名字段:

    CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS \
    SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, \
            EXTRACTJSONFIELD(RAFld1,'$.someFld')        AS someFld, \
            EXTRACTJSONFIELD(RAFld2,'$.aFld')           AS aFld, \
            EXTRACTJSONFIELD(RAFld2,'$.anotherFld')     AS anotherFld \
            FROM my_stream \
    WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
    

    请注意,序列化正在切换到 Avro,以便任何消费者都可以自动使用该架构,而无需手动声明它。

  5. 观察新流有一个模式,并且在消息到达原始 source_data 主题时不断填充消息:

    ksql> DESCRIBE recA_data;
    
    Name                 : RECA_DATA
    Field          | Type
    --------------------------------------------
    ROWTIME        | BIGINT           (system)
    ROWKEY         | VARCHAR(STRING)  (system)
    SOMEOTHERFIELD | VARCHAR(STRING)
    SOMEFLD        | VARCHAR(STRING)
    AFLD           | VARCHAR(STRING)
    ANOTHERFLD     | VARCHAR(STRING)
    --------------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    
    ksql> SELECT * FROM recA_data;
    1545240188787 | null | 1.001 | some data | data | 98.6