KSQL 如何将传感器数据主题与没有公共 ID 的指标主题连接起来
KSQL How to join a sensor data topic with an indicator topic without common id
我有数据从传感器服务器流入我的主题,这是我无法控制的。
在主题 A 中,有多个传感器数据有效载荷传入(a、b、c、d...)。
在主题 B 中有指示消息传入(如 1,2,..),告诉我从现在开始来自主题 A 的传入传感器数据属于新对象 x 而不是 x-1
我想将主题 A 中的数据与当时主题 B 中的当前对象相对应。
我对 KSQL 和流逻辑还很陌生,所以我不知道这是否可行。感觉可能有一个非常简单的解决方案,但我在示例中没有找到类似的解决方案。
编辑:
传感器数据(主题 A)可能如下所示:
sensorPath timestamp value
simulation/machine/plc/sensor-1 | 1 | 7.0
simulation/machine/plc/sensor-2 | 1 | 2.0
simulation/machine/plc/sensor-1 | 2 | 6.0
simulation/machine/plc/sensor-2 | 2 | 1.0
...
simulation/machine/plc/sensor-1 | 10 | 10.0
simulation/machine/plc/sensor-2 | 10 | 12.0
指标数据(主题 B)可能如下所示
informationPath timestamp WorkpieceID
simulation/informationString | 1 | 0020181
simulation/informationString | 10 | 0020182
我基本上是想在一个新的topic/stream中将传感器数据匹配到相应的工件上。新到达的传感器数据始终属于最新的信息字符串/工件。
所以主题 C 应该如下所示:
sensorPath SensorTimestamp value WorkpieceID
simulation/machine/plc/sensor-1 | 1 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 1 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2 | 1.0 | 0020181
...
simulation/machine/plc/sensor-1 | 10 | 10.0| 0020182
simulation/machine/plc/sensor-2 | 10 | 12.0| 0020182
所以我需要像加入 topicA.timestamp >= current(topicB.timestamp) 这样的东西?!
是的,您可以使用 KSQL 做到这一点。这是一个有效的例子。如果您想重现下面的示例,我在这里使用 this docker-compose file 作为我的测试环境。
首先,我将根据您提供的示例填充一些测试数据。我已经根据当前纪元、+2 和 +10 秒组成了时间戳:
传感器测试数据:
docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t sensor -P <<EOF
{"sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
{"sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
{"sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
{"sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
{"sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
{"sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}
EOF
指标测试数据:
docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t indicator -P << EOF
{"informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
{"informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}
EOF
现在,我启动 KSQL CLI:
docker run --network cos_default --interactive --tty --rm \
confluentinc/cp-ksql-cli:5.0.0 \
http://ksql-server:8088
在KSQL中我们可以查看题目中的源数据:
KSQL> PRINT 'sensor' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}
KSQL> PRINT 'indicator' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}
现在我们注册主题以在 KSQL 中使用,并声明模式:
ksql> CREATE STREAM SENSOR (SENSORPATH VARCHAR, VALUE DOUBLE, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='sensor',TIMESTAMP='timestamp');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM INDICATOR (INFORMATIONPATH VARCHAR, WORKPIECEID VARCHAR, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='indicator',TIMESTAMP='timestamp');
Message
----------------
Stream created
----------------
我们可以查询已经创建的KSQL流:
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , sensorpath, value FROM sensor;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-1 | 7.0
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-2 | 2.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-1 | 6.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-2 | 1.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-1 | 10.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-2 | 12.0
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , informationPath, WorkpieceID FROM indicator;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/informationString | 0020181
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/informationString | 0020182
请注意,STREAM 的 ROWTIME
与 PRINT
输出中的 ROWTIME
不同。这是因为 PRINT
输出显示了 Kafka 消息时间戳,而在 STREAM 中,我们覆盖了 WITH
子句中的时间戳,而是使用消息负载本身的 timestamp
列。
为了加入这两个主题,我们要做两件事:
- 创建一个人造键来连接它们,因为 none 当前存在于数据中。我们还将应用这个新列作为 Kafka 消息的键(这是进行连接所必需的)。
- 将 'indicator' 事件流建模为 KSQL table。这使我们能够根据时间戳
查询WorkpieceID
值的当前状态
要添加人工连接键,只需 select 常量并使用 AS
子句为其取别名,并将其用作 PARTITION BY
的消息键:
ksql> CREATE STREAM SENSOR_KEYED AS SELECT sensorPath, value, 'X' AS JOIN_KEY FROM sensor PARTITION BY JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
出于兴趣,我们可以检查已创建的结果 Kafka 主题
ksql> PRINT SENSOR_KEYED FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":7.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":2.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":6.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":1.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":10.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":12.0,"JOIN_KEY":"X"}
请注意,ROWKEY 现在是 JOIN_KEY,而不是上面 PRINT 'sensor'
输出中的 NULL。如果您省略 PARTITION BY
,则会添加 JOIN_KEY,但消息仍未加密,这不是我们希望连接能够工作的结果。
现在我们也重新键入指标数据:
ksql> CREATE STREAM INDICATOR_KEYED AS SELECT informationPath, WorkpieceID, 'X' as JOIN_KEY FROM indicator PARTITION BY JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
ksql> PRINT 'INDICATOR_KEYED' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020181","JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020182","JOIN_KEY":"X"}
重新输入指标数据后,我们现在可以将其注册为 KSQL table。在 table 中,键 的 状态由 KSQL 返回,而不是 每个事件 。我们正在使用这种方法来确定 WorkpieceID
以根据时间戳与传感器读数关联。
ksql> CREATE TABLE INDICATOR_STATE (JOIN_KEY VARCHAR, informationPath varchar, WorkpieceID varchar) with (value_format='json',kafka_topic='INDICATOR_KEYED',KEY='JOIN_KEY');
Message
---------------
Table created
---------------
查询table显示单个值,即当前状态:
ksql> SELECT * FROM INDICATOR_STATE;
1541623771000 | X | X | simulation/informationString | 0020182
如果此时您向 indicator
主题发送了另一条消息,table 的状态将更新并且您会看到从 SELECT
发出的新行。
最后,我们可以做一个stream-table join,坚持到一个新的话题:
ksql> CREATE STREAM SENSOR_ENRICHED AS SELECT S.SENSORPATH, TIMESTAMPTOSTRING(S.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SENSOR_TIMESTAMP, S.VALUE, I.WORKPIECEID FROM SENSOR_KEYED S LEFT JOIN INDICATOR_STATE I ON S.JOIN_KEY=I.JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
检查新流:
ksql> DESCRIBE SENSOR_ENRICHED;
Name : SENSOR_ENRICHED
Field | Type
----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
SENSORPATH | VARCHAR(STRING)
SENSOR_TIMESTAMP | VARCHAR(STRING)
VALUE | DOUBLE
WORKPIECEID | VARCHAR(STRING)
----------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
查询新流:
ksql> SELECT SENSORPATH, SENSOR_TIMESTAMP, VALUE, WORKPIECEID FROM SENSOR_ENRICHED;
simulation/machine/plc/sensor-1 | 2018-11-07 20:39:31 +0000 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:39:31 +0000 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:40:31 +0000 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:40:31 +0000 | 1.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:49:31 +0000 | 10.0 | 0020182
simulation/machine/plc/sensor-2 | 2018-11-07 20:49:31 +0000 | 12.0 | 0020182
由于这是 KSQL,SENSOR_ENRICHED
流(和同名的基础主题)将不断填充,由到达 sensor
主题的事件驱动并反映基于发送到 indicator
主题的事件。
我有数据从传感器服务器流入我的主题,这是我无法控制的。
在主题 A 中,有多个传感器数据有效载荷传入(a、b、c、d...)。
在主题 B 中有指示消息传入(如 1,2,..),告诉我从现在开始来自主题 A 的传入传感器数据属于新对象 x 而不是 x-1
我想将主题 A 中的数据与当时主题 B 中的当前对象相对应。
我对 KSQL 和流逻辑还很陌生,所以我不知道这是否可行。感觉可能有一个非常简单的解决方案,但我在示例中没有找到类似的解决方案。
编辑:
传感器数据(主题 A)可能如下所示:
sensorPath timestamp value
simulation/machine/plc/sensor-1 | 1 | 7.0
simulation/machine/plc/sensor-2 | 1 | 2.0
simulation/machine/plc/sensor-1 | 2 | 6.0
simulation/machine/plc/sensor-2 | 2 | 1.0
...
simulation/machine/plc/sensor-1 | 10 | 10.0
simulation/machine/plc/sensor-2 | 10 | 12.0
指标数据(主题 B)可能如下所示
informationPath timestamp WorkpieceID
simulation/informationString | 1 | 0020181
simulation/informationString | 10 | 0020182
我基本上是想在一个新的topic/stream中将传感器数据匹配到相应的工件上。新到达的传感器数据始终属于最新的信息字符串/工件。
所以主题 C 应该如下所示:
sensorPath SensorTimestamp value WorkpieceID
simulation/machine/plc/sensor-1 | 1 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 1 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2 | 1.0 | 0020181
...
simulation/machine/plc/sensor-1 | 10 | 10.0| 0020182
simulation/machine/plc/sensor-2 | 10 | 12.0| 0020182
所以我需要像加入 topicA.timestamp >= current(topicB.timestamp) 这样的东西?!
是的,您可以使用 KSQL 做到这一点。这是一个有效的例子。如果您想重现下面的示例,我在这里使用 this docker-compose file 作为我的测试环境。
首先,我将根据您提供的示例填充一些测试数据。我已经根据当前纪元、+2 和 +10 秒组成了时间戳:
传感器测试数据:
docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t sensor -P <<EOF {"sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000} {"sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000} {"sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000} {"sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000} {"sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000} {"sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000} EOF
指标测试数据:
docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t indicator -P << EOF {"informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000} {"informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000} EOF
现在,我启动 KSQL CLI:
docker run --network cos_default --interactive --tty --rm \
confluentinc/cp-ksql-cli:5.0.0 \
http://ksql-server:8088
在KSQL中我们可以查看题目中的源数据:
KSQL> PRINT 'sensor' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}
KSQL> PRINT 'indicator' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}
现在我们注册主题以在 KSQL 中使用,并声明模式:
ksql> CREATE STREAM SENSOR (SENSORPATH VARCHAR, VALUE DOUBLE, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='sensor',TIMESTAMP='timestamp');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM INDICATOR (INFORMATIONPATH VARCHAR, WORKPIECEID VARCHAR, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='indicator',TIMESTAMP='timestamp');
Message
----------------
Stream created
----------------
我们可以查询已经创建的KSQL流:
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , sensorpath, value FROM sensor;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-1 | 7.0
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-2 | 2.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-1 | 6.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-2 | 1.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-1 | 10.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-2 | 12.0
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , informationPath, WorkpieceID FROM indicator;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/informationString | 0020181
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/informationString | 0020182
请注意,STREAM 的 ROWTIME
与 PRINT
输出中的 ROWTIME
不同。这是因为 PRINT
输出显示了 Kafka 消息时间戳,而在 STREAM 中,我们覆盖了 WITH
子句中的时间戳,而是使用消息负载本身的 timestamp
列。
为了加入这两个主题,我们要做两件事:
- 创建一个人造键来连接它们,因为 none 当前存在于数据中。我们还将应用这个新列作为 Kafka 消息的键(这是进行连接所必需的)。
- 将 'indicator' 事件流建模为 KSQL table。这使我们能够根据时间戳 查询
WorkpieceID
值的当前状态
要添加人工连接键,只需 select 常量并使用 AS
子句为其取别名,并将其用作 PARTITION BY
的消息键:
ksql> CREATE STREAM SENSOR_KEYED AS SELECT sensorPath, value, 'X' AS JOIN_KEY FROM sensor PARTITION BY JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
出于兴趣,我们可以检查已创建的结果 Kafka 主题
ksql> PRINT SENSOR_KEYED FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":7.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":2.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":6.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":1.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":10.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":12.0,"JOIN_KEY":"X"}
请注意,ROWKEY 现在是 JOIN_KEY,而不是上面 PRINT 'sensor'
输出中的 NULL。如果您省略 PARTITION BY
,则会添加 JOIN_KEY,但消息仍未加密,这不是我们希望连接能够工作的结果。
现在我们也重新键入指标数据:
ksql> CREATE STREAM INDICATOR_KEYED AS SELECT informationPath, WorkpieceID, 'X' as JOIN_KEY FROM indicator PARTITION BY JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
ksql> PRINT 'INDICATOR_KEYED' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020181","JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020182","JOIN_KEY":"X"}
重新输入指标数据后,我们现在可以将其注册为 KSQL table。在 table 中,键 的 状态由 KSQL 返回,而不是 每个事件 。我们正在使用这种方法来确定 WorkpieceID
以根据时间戳与传感器读数关联。
ksql> CREATE TABLE INDICATOR_STATE (JOIN_KEY VARCHAR, informationPath varchar, WorkpieceID varchar) with (value_format='json',kafka_topic='INDICATOR_KEYED',KEY='JOIN_KEY');
Message
---------------
Table created
---------------
查询table显示单个值,即当前状态:
ksql> SELECT * FROM INDICATOR_STATE;
1541623771000 | X | X | simulation/informationString | 0020182
如果此时您向 indicator
主题发送了另一条消息,table 的状态将更新并且您会看到从 SELECT
发出的新行。
最后,我们可以做一个stream-table join,坚持到一个新的话题:
ksql> CREATE STREAM SENSOR_ENRICHED AS SELECT S.SENSORPATH, TIMESTAMPTOSTRING(S.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SENSOR_TIMESTAMP, S.VALUE, I.WORKPIECEID FROM SENSOR_KEYED S LEFT JOIN INDICATOR_STATE I ON S.JOIN_KEY=I.JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
检查新流:
ksql> DESCRIBE SENSOR_ENRICHED;
Name : SENSOR_ENRICHED
Field | Type
----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
SENSORPATH | VARCHAR(STRING)
SENSOR_TIMESTAMP | VARCHAR(STRING)
VALUE | DOUBLE
WORKPIECEID | VARCHAR(STRING)
----------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
查询新流:
ksql> SELECT SENSORPATH, SENSOR_TIMESTAMP, VALUE, WORKPIECEID FROM SENSOR_ENRICHED;
simulation/machine/plc/sensor-1 | 2018-11-07 20:39:31 +0000 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:39:31 +0000 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:40:31 +0000 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:40:31 +0000 | 1.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:49:31 +0000 | 10.0 | 0020182
simulation/machine/plc/sensor-2 | 2018-11-07 20:49:31 +0000 | 12.0 | 0020182
由于这是 KSQL,SENSOR_ENRICHED
流(和同名的基础主题)将不断填充,由到达 sensor
主题的事件驱动并反映基于发送到 indicator
主题的事件。