如何使用 EXTRACTJSONFIELD(message, '$.payload.version') 创建 KSQLdb 流

How to create KSQLdb stream using EXTRACTJSONFIELD(message, '$.payload.version')

如何使用 EXTRACTJSONFIELD 创建 value_type JSON 的 KSQLdb 流?这会使用 select 语句来完成吗?我不清楚在使用 EXTRACTJSONFIELD 运算符创建流期间如何为流定义字段名称。

谢谢

您使用 AS 作为字段名称的别名。这是一个例子。

虚拟源数据:

 ksql> PRINT 'source_data' FROM BEGINNING;
 Format:JSON
 {"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
 {"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}

声明源流

 CREATE STREAM my_stream (Header VARCHAR, 
                          RAFld1 VARCHAR, 
                          RAFld2 VARCHAR, 
                          RBFld1 VARCHAR) 
 WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');

创建派生流

 CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS 
 SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, 
         EXTRACTJSONFIELD(RAFld1,'$.someFld')        AS someFld, 
         EXTRACTJSONFIELD(RAFld2,'$.aFld')           AS aFld, 
         EXTRACTJSONFIELD(RAFld2,'$.anotherFld')     AS anotherFld 
         FROM my_stream \
 WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';

(Source)