来自具有异构 JSON 结构的主题的 KSQL 流
KSQL stream from topic with heterogeneous JSON structures
有没有一种方法可以从指定整个记录应被视为 VARCHAR 的主题创建流,以便我可以使用 extractjsonfield() 从中创建流?示例记录可能类似于:
{
"Header": {
"RecType": "RecA",
... more header records in a fairly consistent format ...
},
"RAFld1": {
"someFld": "some data",
"someOtherField": 1.001,
},
"RAFld2": {
"aFld": "data",
"anotherFld": 98.6,
...
},
...
}
但下一条记录可能如下所示:
{
"Header": {
"RecType": "RecB",
... more header records in a fairly consistent format ...
},
"RBFld1": {
"randomFld": "random data",
"randomOtherField": 1.001,
...
}
}
我可以弄清楚如何将具有已知字段的初始流定义为 VARCHAR 类型,然后使用 extractjsonfield()(使用适当的 where 子句),但看不出有什么方法可以说明顶级结构不具有一致命名的字段。
这是我输入主题的格式;我无法更改该格式。我希望 KSQL 将成为一个优雅的解决方案,但我似乎从一开始就因为无法处理这种动态结构而陷入困境。
如果您在架构中命名的字段没有出现在每条消息中,那也没关系;你只会得到 null
个值。
我认为您的问题很有趣,并且已经写了一篇关于 KSQL 如何在此处工作的解释 - 如果您想用它做其他事情,请告诉我,我可以扩展答案。
查看原始数据:
ksql> PRINT 'source_data' FROM BEGINNING;
Format:JSON
{"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
{"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
注册 source_data
主题以用作名为 my_stream
的 KSQL 流:
CREATE STREAM my_stream (Header VARCHAR, \
RAFld1 VARCHAR, \
RAFld2 VARCHAR, \
RBFld1 VARCHAR) \
WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
检查消息。请注意,在第二条消息(记录类型 "B")中,'RAFld1' 没有值,因此显示 null
:
ksql> SELECT Header, RAFld1 FROM my_stream LIMIT 2;
{"RecType":"RecA"} | {"someOtherField":1.001,"someFld":"some data"}
{"RecType":"RecB"} | null
仅使用记录类型 "A" 值填充新的 Kafka 主题,使用 EXTRACTFROMJSON
过滤 Header 值上的记录类型,并从有效负载中提取命名字段:
CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS \
SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, \
EXTRACTJSONFIELD(RAFld1,'$.someFld') AS someFld, \
EXTRACTJSONFIELD(RAFld2,'$.aFld') AS aFld, \
EXTRACTJSONFIELD(RAFld2,'$.anotherFld') AS anotherFld \
FROM my_stream \
WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
请注意,序列化正在切换到 Avro,以便任何消费者都可以自动使用该架构,而无需手动声明它。
观察新流有一个模式,并且在消息到达原始 source_data
主题时不断填充消息:
ksql> DESCRIBE recA_data;
Name : RECA_DATA
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
SOMEOTHERFIELD | VARCHAR(STRING)
SOMEFLD | VARCHAR(STRING)
AFLD | VARCHAR(STRING)
ANOTHERFLD | VARCHAR(STRING)
--------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> SELECT * FROM recA_data;
1545240188787 | null | 1.001 | some data | data | 98.6
有没有一种方法可以从指定整个记录应被视为 VARCHAR 的主题创建流,以便我可以使用 extractjsonfield() 从中创建流?示例记录可能类似于:
{
"Header": {
"RecType": "RecA",
... more header records in a fairly consistent format ...
},
"RAFld1": {
"someFld": "some data",
"someOtherField": 1.001,
},
"RAFld2": {
"aFld": "data",
"anotherFld": 98.6,
...
},
...
}
但下一条记录可能如下所示:
{
"Header": {
"RecType": "RecB",
... more header records in a fairly consistent format ...
},
"RBFld1": {
"randomFld": "random data",
"randomOtherField": 1.001,
...
}
}
我可以弄清楚如何将具有已知字段的初始流定义为 VARCHAR 类型,然后使用 extractjsonfield()(使用适当的 where 子句),但看不出有什么方法可以说明顶级结构不具有一致命名的字段。
这是我输入主题的格式;我无法更改该格式。我希望 KSQL 将成为一个优雅的解决方案,但我似乎从一开始就因为无法处理这种动态结构而陷入困境。
如果您在架构中命名的字段没有出现在每条消息中,那也没关系;你只会得到 null
个值。
我认为您的问题很有趣,并且已经写了一篇关于 KSQL 如何在此处工作的解释 - 如果您想用它做其他事情,请告诉我,我可以扩展答案。
查看原始数据:
ksql> PRINT 'source_data' FROM BEGINNING; Format:JSON {"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}} {"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
注册
source_data
主题以用作名为my_stream
的 KSQL 流:CREATE STREAM my_stream (Header VARCHAR, \ RAFld1 VARCHAR, \ RAFld2 VARCHAR, \ RBFld1 VARCHAR) \ WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
检查消息。请注意,在第二条消息(记录类型 "B")中,'RAFld1' 没有值,因此显示
null
:ksql> SELECT Header, RAFld1 FROM my_stream LIMIT 2; {"RecType":"RecA"} | {"someOtherField":1.001,"someFld":"some data"} {"RecType":"RecB"} | null
仅使用记录类型 "A" 值填充新的 Kafka 主题,使用
EXTRACTFROMJSON
过滤 Header 值上的记录类型,并从有效负载中提取命名字段:CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS \ SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, \ EXTRACTJSONFIELD(RAFld1,'$.someFld') AS someFld, \ EXTRACTJSONFIELD(RAFld2,'$.aFld') AS aFld, \ EXTRACTJSONFIELD(RAFld2,'$.anotherFld') AS anotherFld \ FROM my_stream \ WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
请注意,序列化正在切换到 Avro,以便任何消费者都可以自动使用该架构,而无需手动声明它。
观察新流有一个模式,并且在消息到达原始
source_data
主题时不断填充消息:ksql> DESCRIBE recA_data; Name : RECA_DATA Field | Type -------------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) SOMEOTHERFIELD | VARCHAR(STRING) SOMEFLD | VARCHAR(STRING) AFLD | VARCHAR(STRING) ANOTHERFLD | VARCHAR(STRING) -------------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>; ksql> SELECT * FROM recA_data; 1545240188787 | null | 1.001 | some data | data | 98.6