数组的双重展平。 ksqldb 0.8.1
Double flattening of arrays. Ksqldb 0.8.1
数据。
kafkacat -b 127.0.0.1 -t group-topic -P
{"groups":[{"name":"Roberth","surname":"Smith","origin":"England","albums":["Wish","Desintegration"],"group":"The Cure"},{"name":"Peter","surname":"Murphy","origin":"England","albums":["Mask","In The Flat Field"],"group":"Bauhaus"}]};
// 结构流
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM GROUPS_01
(groups ARRAY<STRUCT<
albums ARRAY<VARCHAR>,
name VARCHAR,
surname VARCHAR
>>)
WITH (kafka_topic='group-topic', value_format='JSON');
SELECT
EXPLODE(groups)->name AS name,
EXPLODE(groups)->surname AS surname,
EXPLODE(groups)->albums AS albums
FROM GROUPS_01
EMIT CHANGES;
//我有
NAME SURNAME ALBUMS
Roberth Smith [Wish,Desintegration]
Peter Murphy [Mask,In The Flat Field]
// 我需要
NAME SURNAME ALBUM
Roberth Smith Wish
Roberth Smith Desintegration
Peter Murphy Mask
Peter Murphy In The Flat Field
// 尝试
EXPLODE(groups)->EXPLODE(albums)->album AS album
EXPLODE(albums)->album AS album
为清楚起见,这里是您提供的源数据:
{
"groups": [
{
"name": "Roberth",
"surname": "Smith",
"origin": "England",
"albums": [
"Wish",
"Desintegration"
],
"group": "The Cure"
},
{
"name": "Peter",
"surname": "Murphy",
"origin": "England",
"albums": [
"Mask",
"In The Flat Field"
],
"group": "Bauhaus"
}
]
}
先爆出根数组
ksql> CREATE STREAM EX1A AS SELECT EXPLODE(GROUPS) AS GROUP_SINGLE FROM GROUPS_01 EMIT CHANGES;
Message
-----------------------------------
Created query with ID CSAS_EX1A_5
-----------------------------------
这给了我们:
ksql> SELECT * FROM EX1A EMIT CHANGES;
+----------------+-------+-----------------------------------------------------------+
|ROWTIME |ROWKEY |GROUP_SINGLE |
+----------------+-------+-----------------------------------------------------------+
|1585666857714 |null |{ALBUMS=[Wish, Desintegration], NAME=Roberth, SURNAME=Smith|
| | |} |
|1585666857714 |null |{ALBUMS=[Mask, In The Flat Field], NAME=Peter, SURNAME=Murp|
| | |hy} |
现在使用 ->
运算符访问嵌套结构并展开 ALBUMS
数组:
CREATE STREAM ALBUMS_EXPLODED AS
SELECT GROUP_SINGLE->NAME AS NAME,
GROUP_SINGLE->SURNAME AS SURNAME,
EXPLODE(GROUP_SINGLE->ALBUMS) AS ALBUM
FROM EX1A
EMIT CHANGES;
ksql> SELECT NAME, SURNAME, ALBUM FROM ALBUMS_EXPLODED EMIT CHANGES;
+-------------------+----------------------+-------------------+
|NAME |SURNAME |ALBUM |
+-------------------+----------------------+-------------------+
|Roberth |Smith |Wish |
|Roberth |Smith |Desintegration |
|Peter |Murphy |Mask |
|Peter |Murphy |In The Flat Field |
数据。
kafkacat -b 127.0.0.1 -t group-topic -P
{"groups":[{"name":"Roberth","surname":"Smith","origin":"England","albums":["Wish","Desintegration"],"group":"The Cure"},{"name":"Peter","surname":"Murphy","origin":"England","albums":["Mask","In The Flat Field"],"group":"Bauhaus"}]};
// 结构流
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM GROUPS_01
(groups ARRAY<STRUCT<
albums ARRAY<VARCHAR>,
name VARCHAR,
surname VARCHAR
>>)
WITH (kafka_topic='group-topic', value_format='JSON');
SELECT
EXPLODE(groups)->name AS name,
EXPLODE(groups)->surname AS surname,
EXPLODE(groups)->albums AS albums
FROM GROUPS_01
EMIT CHANGES;
//我有
NAME SURNAME ALBUMS
Roberth Smith [Wish,Desintegration]
Peter Murphy [Mask,In The Flat Field]
// 我需要
NAME SURNAME ALBUM
Roberth Smith Wish
Roberth Smith Desintegration
Peter Murphy Mask
Peter Murphy In The Flat Field
// 尝试
EXPLODE(groups)->EXPLODE(albums)->album AS album
EXPLODE(albums)->album AS album
为清楚起见,这里是您提供的源数据:
{
"groups": [
{
"name": "Roberth",
"surname": "Smith",
"origin": "England",
"albums": [
"Wish",
"Desintegration"
],
"group": "The Cure"
},
{
"name": "Peter",
"surname": "Murphy",
"origin": "England",
"albums": [
"Mask",
"In The Flat Field"
],
"group": "Bauhaus"
}
]
}
先爆出根数组
ksql> CREATE STREAM EX1A AS SELECT EXPLODE(GROUPS) AS GROUP_SINGLE FROM GROUPS_01 EMIT CHANGES;
Message
-----------------------------------
Created query with ID CSAS_EX1A_5
-----------------------------------
这给了我们:
ksql> SELECT * FROM EX1A EMIT CHANGES;
+----------------+-------+-----------------------------------------------------------+
|ROWTIME |ROWKEY |GROUP_SINGLE |
+----------------+-------+-----------------------------------------------------------+
|1585666857714 |null |{ALBUMS=[Wish, Desintegration], NAME=Roberth, SURNAME=Smith|
| | |} |
|1585666857714 |null |{ALBUMS=[Mask, In The Flat Field], NAME=Peter, SURNAME=Murp|
| | |hy} |
现在使用 ->
运算符访问嵌套结构并展开 ALBUMS
数组:
CREATE STREAM ALBUMS_EXPLODED AS
SELECT GROUP_SINGLE->NAME AS NAME,
GROUP_SINGLE->SURNAME AS SURNAME,
EXPLODE(GROUP_SINGLE->ALBUMS) AS ALBUM
FROM EX1A
EMIT CHANGES;
ksql> SELECT NAME, SURNAME, ALBUM FROM ALBUMS_EXPLODED EMIT CHANGES;
+-------------------+----------------------+-------------------+
|NAME |SURNAME |ALBUM |
+-------------------+----------------------+-------------------+
|Roberth |Smith |Wish |
|Roberth |Smith |Desintegration |
|Peter |Murphy |Mask |
|Peter |Murphy |In The Flat Field |