在具有不同类型的 JSON 值主题上定义 KSQL STRUCT
Define KSQL STRUCT on JSON valued topic with different types
(编辑:为了更好地反映意图进行了轻微编辑,但由于取得了进展而进行了大量编辑。)
主题 "t_raw"
有多种类型的消息,它们都包含一个共同的 "type"
键:
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
最终,我需要将其拆分到其他流中,它们将 chopped/aggregated/processed。我希望能够对所有内容使用 STRUCT
,但我目前的努力让我这样做:
create stream raw (type varchar, data varchar) \
with (kafka_topic='t_raw', value_format='JSON');
第一关,然后
create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
select \
extractjsonfield(data, '$.ts') as ts, \
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b \
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
select \
extractjsonfield(data, '$.ts') as ts, \
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c, \
extractjsonfield(data, '$.d') as d \
from raw where type='key2';
这似乎可行,但最近添加了 STRUCT
,有没有办法像上面那样使用它来代替 extractjsonfield
?
ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2
如果不使用 STRUCT
,是否有直接的方法使用 vanilla kafka-streams 执行此操作(副 ksql
,因此使用 apache-kafka-streams 标签)?
是否有更多 kafka-esque/efficient/elegant 的解析方法?
我不能将其定义为空 STRUCT<>
ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) \
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}
有 some (not-so-recent) discussion 可以做类似
CREATE STREAM key1 ( a INT, b VARCHAR ) AS \
SELECT data->* from some_input where type = 'key1';
仅供参考:上述解决方案在 confluent-5.0.0 中不起作用,a recent patch 修复了 extractjsonfield
错误并启用了此解决方案。
真实数据还有几种类似的消息类型。它们都包含 "type"
和 "data"
键(顶层没有其他键),并且几乎所有的 "ts"
时间戳都嵌套在 "data"
.[=33= 中]
是的,您可以这样做 - KSQL 不介意列不存在,您只需要一个 null
值。
测试数据设置
将一些测试数据填充到主题中:
kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF
将topic转至KSQL控制台查看:
ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>
对源数据流建模
在其上创建流。请注意 STRUCT
的使用和每个可能列的引用:
CREATE STREAM T (TYPE VARCHAR, \
DATA STRUCT< \
TS VARCHAR, \
A INT, \
B VARCHAR, \
C INT, \
D VARCHAR>) \
WITH (KAFKA_TOPIC='t_raw',\
VALUE_FORMAT='JSON');
将offset设置为earliest以便我们查询整个主题,然后使用KSQL访问完整流:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated
单独查询类型,使用 ->
运算符访问嵌套元素:
ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1' LIMIT 2;
1 | hello
2 | hello2
ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2
将数据保存在单独的 Kafka 主题中:
用分离的数据填充目标主题:
ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
Message
----------------------------
Stream created and running
----------------------------
新流的架构:
ksql> DESCRIBE TYPE_1;
Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;
Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------
支撑每个 KSQL 流的主题:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------
(编辑:为了更好地反映意图进行了轻微编辑,但由于取得了进展而进行了大量编辑。)
主题 "t_raw"
有多种类型的消息,它们都包含一个共同的 "type"
键:
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
最终,我需要将其拆分到其他流中,它们将 chopped/aggregated/processed。我希望能够对所有内容使用 STRUCT
,但我目前的努力让我这样做:
create stream raw (type varchar, data varchar) \
with (kafka_topic='t_raw', value_format='JSON');
第一关,然后
create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
select \
extractjsonfield(data, '$.ts') as ts, \
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b \
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
select \
extractjsonfield(data, '$.ts') as ts, \
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c, \
extractjsonfield(data, '$.d') as d \
from raw where type='key2';
这似乎可行,但最近添加了 STRUCT
,有没有办法像上面那样使用它来代替 extractjsonfield
?
ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2
如果不使用 STRUCT
,是否有直接的方法使用 vanilla kafka-streams 执行此操作(副 ksql
,因此使用 apache-kafka-streams 标签)?
是否有更多 kafka-esque/efficient/elegant 的解析方法?
我不能将其定义为空 STRUCT<>
ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) \
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}
有 some (not-so-recent) discussion 可以做类似
CREATE STREAM key1 ( a INT, b VARCHAR ) AS \
SELECT data->* from some_input where type = 'key1';
仅供参考:上述解决方案在 confluent-5.0.0 中不起作用,a recent patch 修复了 extractjsonfield
错误并启用了此解决方案。
真实数据还有几种类似的消息类型。它们都包含 "type"
和 "data"
键(顶层没有其他键),并且几乎所有的 "ts"
时间戳都嵌套在 "data"
.[=33= 中]
是的,您可以这样做 - KSQL 不介意列不存在,您只需要一个 null
值。
测试数据设置
将一些测试数据填充到主题中:
kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF
将topic转至KSQL控制台查看:
ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>
对源数据流建模
在其上创建流。请注意 STRUCT
的使用和每个可能列的引用:
CREATE STREAM T (TYPE VARCHAR, \
DATA STRUCT< \
TS VARCHAR, \
A INT, \
B VARCHAR, \
C INT, \
D VARCHAR>) \
WITH (KAFKA_TOPIC='t_raw',\
VALUE_FORMAT='JSON');
将offset设置为earliest以便我们查询整个主题,然后使用KSQL访问完整流:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated
单独查询类型,使用 ->
运算符访问嵌套元素:
ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1' LIMIT 2;
1 | hello
2 | hello2
ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2
将数据保存在单独的 Kafka 主题中:
用分离的数据填充目标主题:
ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
Message
----------------------------
Stream created and running
----------------------------
新流的架构:
ksql> DESCRIBE TYPE_1;
Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;
Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------
支撑每个 KSQL 流的主题:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------