使用来自两个 ksql 流的值获取一个 ksql 流

Get one ksql stream with values from two ksql streams

我创建了一个主流,其值为数组。我会同时获取数组字段并将其流式传输到 ne ksql 流。

首先我创建主流:

CREATE STREAM runtime_master_stream
     (timestamp BIGINT,
      opcuaObject VARCHAR,
      value array<DOUBLE>)
   WITH (KAFKA_TOPIC='runtime_master', VALUE_FORMAT='JSON');

输出为:

{"ROWTIME":1557317077577,"ROWKEY":"\u0000\u0000\u0001j�T�I","timestamp":1557317069589,"opcuaObject":"DatBetrZ.BetrZStdOM","value":[19.737154,512.0,320.18024,423.87027,399.99384,292.1198,450.821]}

然后我创建新的流来获得一个数组字段:

CREATE STREAM runtime_std_om_all_knife_stream
 WITH (TIMESTAMP='timestamp',
        PARTITIONS=4,
        VALUE_FORMAT='JSON') AS
 SELECT
        timestamp,
        opcuaObject,
        value[0] AS knife1,
    value[1] AS knife02,
    value[2] AS knife03,
    value[3] AS knife04,
    value[4] AS knife05,
        value[5] AS knife05,
    value[6] AS knife06
 FROM RUNTIME_MASTER_STREAM
 WHERE opcuaObject ='DatBetrZ.BetrZStdOM';

输出:

{"ROWTIME":1557317170312,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"DatBetrZ.BetrZStdOM","KNIFE1":19.737154,"KNIFE02":512.0,"KNIFE03":320.18024,"KNIFE04":42
3.87027,"KNIFE05":400.02216,"KNIFE06":292.1198,}

我需要的是一个 ksql 流中的输出示例:

{"ROWTIME":1557317170312,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife1","VALUE":19.737154}

{"ROWTIME":1557317170313,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife02","VALUE":19.737154}

{"ROWTIME":1557317170312,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife03","VALUE":19.737154}

{"ROWTIME":1557317170313,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife04","VALUE":19.737154}

{"ROWTIME":1557317170312,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife05","VALUE":19.737154}

{"ROWTIME":1557317170313,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife06","VALUE":39.737154}

知道如何在 kafka ksql 中用一个流获得这个输出吗?

您正在寻找的是一个 EXPLODE/UNNEST 函数,该函数当前不可用,但有 an existing issue 如果您认为可以投票有用。

您的解决方法可能是这样的,其中您 brute-force 使用 INSERT INTO 遍历数组的所有可能索引来填充目标流:

将一些测试数据填充到 Kafka 主题中

$ curl "https://api.mockaroo.com/api/440970e0?count=5&key=ff7856d0" | \
    kafkacat -P -b localhost -t car_data_01

查看KSQL中的数据:

ksql> PRINT 'car_data_01' FROM BEGINNING;
Format:JSON
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1533200557","car":"Oldsmobile","value":[68.93,53.58]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1548442477","car":"Mercury","value":[60.09,69.07,63.77,63.13]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1544928225","car":"Volkswagen","value":[59.77,6.94,97.7,30.86,16.9]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1545383393","car":"Nissan","value":[13.32]}
{"ROWTIME":1557392065412,"ROWKEY":"null","timestamp":"1552825010","car":"Hyundai","value":[12.92]}

创建数据流:

CREATE STREAM CAR_DATA (timestamp BIGINT, CAR VARCHAR, VALUE ARRAY<DOUBLE>) WITH (KAFKA_TOPIC='car_data_01', VALUE_FORMAT='JSON');
ksql> SELECT TIMESTAMP, CAR, VALUE[0], VALUE[1], VALUE[2] FROM CAR_DATA;
1533200557 | Oldsmobile | 68.93 | 53.58 | null
1548442477 | Mercury | 60.09 | 69.07 | 63.77
1544928225 | Volkswagen | 59.77 | 6.94 | 97.7
1545383393 | Nissan | 13.32 | null | null
1552825010 | Hyundai | 12.92 | null | null

创建输出流,首先只包含数组的 zero-index 个元素:

CREATE STREAM CAR_DATA_EXPLODED AS SELECT TIMESTAMP, CAR, 'Sensor 00' AS SOURCE, VALUE[0] AS VALUE FROM CAR_DATA WHERE VALUE[0] IS NOT NULL;
ksql> SELECT * FROM CAR_DATA_EXPLODED;
1557392065409 | null | 1544928225 | Volkswagen | Sensor 00 | 59.77
1557392065409 | null | 1545383393 | Nissan | Sensor 00 | 13.32
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 00 | 68.93
1557392065412 | null | 1552825010 | Hyundai | Sensor 00 | 12.92
1557392065409 | null | 1548442477 | Mercury | Sensor 00 | 60.09

将剩余的数组索引插入新流:

CREATE STREAM CAR_DATA_EXPLODED_00 AS SELECT TIMESTAMP, CAR, 'Sensor 00' AS SOURCE, VALUE[0] AS VALUE FROM CAR_DATA
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 01' AS SOURCE, VALUE[1] AS VALUE FROM CAR_DATA WHERE  VALUE[1] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 02' AS SOURCE, VALUE[2] AS VALUE FROM CAR_DATA WHERE  VALUE[2] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 03' AS SOURCE, VALUE[3] AS VALUE FROM CAR_DATA WHERE  VALUE[3] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 04' AS SOURCE, VALUE[4] AS VALUE FROM CAR_DATA WHERE  VALUE[4] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 05' AS SOURCE, VALUE[5] AS VALUE FROM CAR_DATA WHERE  VALUE[5] IS NOT NULL;

检查分解数据:

ksql> SELECT * FROM CAR_DATA_EXPLODED_00;
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 00 | 68.93
1557392065409 | null | 1545383393 | Nissan | Sensor 00 | 13.32
1557392065409 | null | 1544928225 | Volkswagen | Sensor 00 | 59.77
1557392065409 | null | 1548442477 | Mercury | Sensor 02 | 63.77
1557392065409 | null | 1544928225 | Volkswagen | Sensor 03 | 30.86
1557392065409 | null | 1544928225 | Volkswagen | Sensor 04 | 16.9
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 01 | 53.58
1557392065409 | null | 1544928225 | Volkswagen | Sensor 02 | 97.7
1557392065412 | null | 1552825010 | Hyundai | Sensor 00 | 12.92
1557392065409 | null | 1548442477 | Mercury | Sensor 01 | 69.07
1557392065409 | null | 1548442477 | Mercury | Sensor 00 | 60.09
1557392065409 | null | 1544928225 | Volkswagen | Sensor 01 | 6.94
1557392065409 | null | 1548442477 | Mercury | Sensor 03 | 63.13
ksql> SELECT * FROM CAR_DATA_EXPLODED_00 WHERE SOURCE='Sensor 03';
1557392065409 | null | 1544928225 | Volkswagen | Sensor 03 | 30.86
1557392065409 | null | 1548442477 | Mercury | Sensor 03 | 63.13