如何使用 EXTRACTJSONFIELD(message, '$.payload.version') 创建 KSQLdb 流
How to create KSQLdb stream using EXTRACTJSONFIELD(message, '$.payload.version')
如何使用 EXTRACTJSONFIELD 创建 value_type JSON 的 KSQLdb 流?这会使用 select 语句来完成吗?我不清楚在使用 EXTRACTJSONFIELD 运算符创建流期间如何为流定义字段名称。
谢谢
您使用 AS
作为字段名称的别名。这是一个例子。
虚拟源数据:
ksql> PRINT 'source_data' FROM BEGINNING;
Format:JSON
{"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
{"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
声明源流
CREATE STREAM my_stream (Header VARCHAR,
RAFld1 VARCHAR,
RAFld2 VARCHAR,
RBFld1 VARCHAR)
WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
创建派生流
CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS
SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField,
EXTRACTJSONFIELD(RAFld1,'$.someFld') AS someFld,
EXTRACTJSONFIELD(RAFld2,'$.aFld') AS aFld,
EXTRACTJSONFIELD(RAFld2,'$.anotherFld') AS anotherFld
FROM my_stream \
WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
(Source)
如何使用 EXTRACTJSONFIELD 创建 value_type JSON 的 KSQLdb 流?这会使用 select 语句来完成吗?我不清楚在使用 EXTRACTJSONFIELD 运算符创建流期间如何为流定义字段名称。
谢谢
您使用 AS
作为字段名称的别名。这是一个例子。
虚拟源数据:
ksql> PRINT 'source_data' FROM BEGINNING;
Format:JSON
{"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
{"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
声明源流
CREATE STREAM my_stream (Header VARCHAR,
RAFld1 VARCHAR,
RAFld2 VARCHAR,
RBFld1 VARCHAR)
WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
创建派生流
CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS
SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField,
EXTRACTJSONFIELD(RAFld1,'$.someFld') AS someFld,
EXTRACTJSONFIELD(RAFld2,'$.aFld') AS aFld,
EXTRACTJSONFIELD(RAFld2,'$.anotherFld') AS anotherFld
FROM my_stream \
WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
(Source)