根据复合键保存最后一条记录。 ksqlDB 0.6.0

Save last record according to a composite key. ksqlDB 0.6.0

我有一个具有以下数据流的 Kafka 主题 (ksqldb_topic_01):

% Reached end of topic ksqldb_topic_01 [0] at offset 213
{"city":"Sevilla","temperature":20,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 214
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 215
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 216
{"city":"Valencia","temperature":15,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 217
{"city":"Sevilla","temperature":15,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 218
{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 219
{"city":"Valencia","temperature":15,"sensorId":"sensor02"}
% Reached end of topic ksqldb_topic_01 [0] at offset 220
{"city":"Sevilla","temperature":5,"sensorId":"sensor02"}
% Reached end of topic ksqldb_topic_01 [0] at offset 221
{"city":"Sevilla","temperature":5,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 222

我想在 table 中保存在主题中输入我的最后一个值,对于每个城市和 sensorId

在我的 ksqldDB 中,我创建了以下 table:

CREATE TABLE ultimo_resgistro(city VARCHAR,sensorId VARCHAR,temperature INTEGER) WITH (KAFKA_TOPIC='ksqldb_topic_01', VALUE_FORMAT='json',KEY = 'sensorId,city');
DESCRIBE EXTENDED ULTIMO_RESGISTRO;

Name                 : ULTIMO_RESGISTRO
Type                 : TABLE
Key field            : SENSORID
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : ksqldb_topic_01 (partitions: 1, replication: 1)

 Field       | Type                      
-----------------------------------------
 ROWTIME     | BIGINT           (system) 
 ROWKEY      | VARCHAR(STRING)  (system) 
 CITY        | VARCHAR(STRING)           
 SENSORID    | VARCHAR(STRING)           
 TEMPERATURE | INTEGER                   
-----------------------------------------

看到数据正在处理我

select * from ultimo_resgistro emit changes;
+------------------+------------------+------------------+------------------+------------------+
|ROWTIME           |ROWKEY            |CITY              |SENSORID          |TEMPERATURE       |
+------------------+------------------+------------------+------------------+------------------+
key cannot be null
Query terminated

问题是你需要正确设置Kafka消息的key。您也不能在 KEY 子句中指定两个字段。阅读更多相关信息 here

这是一个如何操作的示例。

首先,加载测试数据:

kafkacat -b kafka-1:39092 -P -t ksqldb_topic_01 <<EOF
{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":20,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor03"}
EOF

现在在 ksqlDB 中声明主题上的模式 - 作为流,因为我们需要重新分区数据以添加键。如果您控制生产者进入主题,那么您可能会在上游执行此操作并节省一个步骤。

CREATE STREAM sensor_data_raw (city VARCHAR, temperature DOUBLE, sensorId VARCHAR) 
    WITH (KAFKA_TOPIC='ksqldb_topic_01', VALUE_FORMAT='JSON');

根据复合键对数据进行重新分区。

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM sensor_data_repartitioned WITH (VALUE_FORMAT='AVRO') AS
    SELECT *
      FROM sensor_data_raw
    PARTITION BY city+sensorId;

需要注意两点:

  1. 我借此机会重新序列化为 Avro - 如果您希望始终保留 JSON,则只需省略 WITH (VALUE_FORMAT 子句即可。
  2. 当数据被重新分区时,顺序保证会丢失,因此理论上您可能会在此之后以乱序结束事件。

此时我们可以检查转换后的主题:

ksql> PRINT SENSOR_DATA_REPARTITIONED FROM BEGINNING LIMIT 5;
Format:AVRO
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 5.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 10.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 15.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor03, {"CITY": "Sevilla", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}

请注意,与没有密钥的原始数据相比,Kafka 消息中的密钥(第二个字段,时间戳之后)现在设置正确:

ksql> PRINT ksqldb_topic_01 FROM BEGINNING LIMIT 5;
Format:JSON
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":20,"sensorId":"sensor03"}

现在我们可以在重新分区的数据上声明一个 table。由于我现在使用的是 Avro,所以我不必重新输入架构。如果我使用 JSON,我将需要再次输入它作为此 DDL 的一部分。

CREATE TABLE ultimo_resgistro WITH (KAFKA_TOPIC='SENSOR_DATA_REPARTITIONED', VALUE_FORMAT='AVRO');

table的key隐式取自ROWKEY,是Kafka消息的key。

ksql> SELECT ROWKEY, CITY, SENSORID, TEMPERATURE FROM ULTIMO_RESGISTRO EMIT CHANGES;
+------------------+----------+----------+-------------+
|ROWKEY            |CITY      |SENSORID  |TEMPERATURE  |
+------------------+----------+----------+-------------+
|Madridsensor03    |Madrid    |sensor03  |5.0          |
|Sevillasensor03   |Sevilla   |sensor03  |20.0         |
|Sevillasensor01   |Sevilla   |sensor01  |5.0          |
|Sevillasensor02   |Sevilla   |sensor02  |5.0          |
|Valenciasensor02  |Valencia  |sensor02  |15.0         |
|Valenciasensor03  |Valencia  |sensor03  |15.0         |

如果您想利用拉取查询(为了获得最新值),那么您需要去投票(或贡献 PR)this issue