根据复合键保存最后一条记录。 ksqlDB 0.6.0
Save last record according to a composite key. ksqlDB 0.6.0
我有一个具有以下数据流的 Kafka 主题 (ksqldb_topic_01):
% Reached end of topic ksqldb_topic_01 [0] at offset 213
{"city":"Sevilla","temperature":20,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 214
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 215
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 216
{"city":"Valencia","temperature":15,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 217
{"city":"Sevilla","temperature":15,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 218
{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 219
{"city":"Valencia","temperature":15,"sensorId":"sensor02"}
% Reached end of topic ksqldb_topic_01 [0] at offset 220
{"city":"Sevilla","temperature":5,"sensorId":"sensor02"}
% Reached end of topic ksqldb_topic_01 [0] at offset 221
{"city":"Sevilla","temperature":5,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 222
我想在 table 中保存在主题中输入我的最后一个值,对于每个城市和 sensorId
在我的 ksqldDB 中,我创建了以下 table:
CREATE TABLE ultimo_resgistro(city VARCHAR,sensorId VARCHAR,temperature INTEGER) WITH (KAFKA_TOPIC='ksqldb_topic_01', VALUE_FORMAT='json',KEY = 'sensorId,city');
DESCRIBE EXTENDED ULTIMO_RESGISTRO;
Name : ULTIMO_RESGISTRO
Type : TABLE
Key field : SENSORID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : ksqldb_topic_01 (partitions: 1, replication: 1)
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
CITY | VARCHAR(STRING)
SENSORID | VARCHAR(STRING)
TEMPERATURE | INTEGER
-----------------------------------------
看到数据正在处理我
select * from ultimo_resgistro emit changes;
+------------------+------------------+------------------+------------------+------------------+
|ROWTIME |ROWKEY |CITY |SENSORID |TEMPERATURE |
+------------------+------------------+------------------+------------------+------------------+
key cannot be null
Query terminated
问题是你需要正确设置Kafka消息的key。您也不能在 KEY
子句中指定两个字段。阅读更多相关信息 here
这是一个如何操作的示例。
首先,加载测试数据:
kafkacat -b kafka-1:39092 -P -t ksqldb_topic_01 <<EOF
{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":20,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor03"}
EOF
现在在 ksqlDB 中声明主题上的模式 - 作为流,因为我们需要重新分区数据以添加键。如果您控制生产者进入主题,那么您可能会在上游执行此操作并节省一个步骤。
CREATE STREAM sensor_data_raw (city VARCHAR, temperature DOUBLE, sensorId VARCHAR)
WITH (KAFKA_TOPIC='ksqldb_topic_01', VALUE_FORMAT='JSON');
根据复合键对数据进行重新分区。
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM sensor_data_repartitioned WITH (VALUE_FORMAT='AVRO') AS
SELECT *
FROM sensor_data_raw
PARTITION BY city+sensorId;
需要注意两点:
- 我借此机会重新序列化为 Avro - 如果您希望始终保留 JSON,则只需省略
WITH (VALUE_FORMAT
子句即可。
- 当数据被重新分区时,顺序保证会丢失,因此理论上您可能会在此之后以乱序结束事件。
此时我们可以检查转换后的主题:
ksql> PRINT SENSOR_DATA_REPARTITIONED FROM BEGINNING LIMIT 5;
Format:AVRO
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 5.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 10.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 15.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor03, {"CITY": "Sevilla", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}
请注意,与没有密钥的原始数据相比,Kafka 消息中的密钥(第二个字段,时间戳之后)现在设置正确:
ksql> PRINT ksqldb_topic_01 FROM BEGINNING LIMIT 5;
Format:JSON
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":20,"sensorId":"sensor03"}
现在我们可以在重新分区的数据上声明一个 table。由于我现在使用的是 Avro,所以我不必重新输入架构。如果我使用 JSON,我将需要再次输入它作为此 DDL 的一部分。
CREATE TABLE ultimo_resgistro WITH (KAFKA_TOPIC='SENSOR_DATA_REPARTITIONED', VALUE_FORMAT='AVRO');
table的key隐式取自ROWKEY
,是Kafka消息的key。
ksql> SELECT ROWKEY, CITY, SENSORID, TEMPERATURE FROM ULTIMO_RESGISTRO EMIT CHANGES;
+------------------+----------+----------+-------------+
|ROWKEY |CITY |SENSORID |TEMPERATURE |
+------------------+----------+----------+-------------+
|Madridsensor03 |Madrid |sensor03 |5.0 |
|Sevillasensor03 |Sevilla |sensor03 |20.0 |
|Sevillasensor01 |Sevilla |sensor01 |5.0 |
|Sevillasensor02 |Sevilla |sensor02 |5.0 |
|Valenciasensor02 |Valencia |sensor02 |15.0 |
|Valenciasensor03 |Valencia |sensor03 |15.0 |
如果您想利用拉取查询(为了获得最新值),那么您需要去投票(或贡献 PR)this issue。
我有一个具有以下数据流的 Kafka 主题 (ksqldb_topic_01):
% Reached end of topic ksqldb_topic_01 [0] at offset 213
{"city":"Sevilla","temperature":20,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 214
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 215
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 216
{"city":"Valencia","temperature":15,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 217
{"city":"Sevilla","temperature":15,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 218
{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 219
{"city":"Valencia","temperature":15,"sensorId":"sensor02"}
% Reached end of topic ksqldb_topic_01 [0] at offset 220
{"city":"Sevilla","temperature":5,"sensorId":"sensor02"}
% Reached end of topic ksqldb_topic_01 [0] at offset 221
{"city":"Sevilla","temperature":5,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 222
我想在 table 中保存在主题中输入我的最后一个值,对于每个城市和 sensorId
在我的 ksqldDB 中,我创建了以下 table:
CREATE TABLE ultimo_resgistro(city VARCHAR,sensorId VARCHAR,temperature INTEGER) WITH (KAFKA_TOPIC='ksqldb_topic_01', VALUE_FORMAT='json',KEY = 'sensorId,city');
DESCRIBE EXTENDED ULTIMO_RESGISTRO;
Name : ULTIMO_RESGISTRO
Type : TABLE
Key field : SENSORID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : ksqldb_topic_01 (partitions: 1, replication: 1)
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
CITY | VARCHAR(STRING)
SENSORID | VARCHAR(STRING)
TEMPERATURE | INTEGER
-----------------------------------------
看到数据正在处理我
select * from ultimo_resgistro emit changes;
+------------------+------------------+------------------+------------------+------------------+
|ROWTIME |ROWKEY |CITY |SENSORID |TEMPERATURE |
+------------------+------------------+------------------+------------------+------------------+
key cannot be null
Query terminated
问题是你需要正确设置Kafka消息的key。您也不能在 KEY
子句中指定两个字段。阅读更多相关信息 here
这是一个如何操作的示例。
首先,加载测试数据:
kafkacat -b kafka-1:39092 -P -t ksqldb_topic_01 <<EOF
{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":20,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor03"}
EOF
现在在 ksqlDB 中声明主题上的模式 - 作为流,因为我们需要重新分区数据以添加键。如果您控制生产者进入主题,那么您可能会在上游执行此操作并节省一个步骤。
CREATE STREAM sensor_data_raw (city VARCHAR, temperature DOUBLE, sensorId VARCHAR)
WITH (KAFKA_TOPIC='ksqldb_topic_01', VALUE_FORMAT='JSON');
根据复合键对数据进行重新分区。
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM sensor_data_repartitioned WITH (VALUE_FORMAT='AVRO') AS
SELECT *
FROM sensor_data_raw
PARTITION BY city+sensorId;
需要注意两点:
- 我借此机会重新序列化为 Avro - 如果您希望始终保留 JSON,则只需省略
WITH (VALUE_FORMAT
子句即可。 - 当数据被重新分区时,顺序保证会丢失,因此理论上您可能会在此之后以乱序结束事件。
此时我们可以检查转换后的主题:
ksql> PRINT SENSOR_DATA_REPARTITIONED FROM BEGINNING LIMIT 5;
Format:AVRO
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 5.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 10.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 15.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor03, {"CITY": "Sevilla", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}
请注意,与没有密钥的原始数据相比,Kafka 消息中的密钥(第二个字段,时间戳之后)现在设置正确:
ksql> PRINT ksqldb_topic_01 FROM BEGINNING LIMIT 5;
Format:JSON
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":20,"sensorId":"sensor03"}
现在我们可以在重新分区的数据上声明一个 table。由于我现在使用的是 Avro,所以我不必重新输入架构。如果我使用 JSON,我将需要再次输入它作为此 DDL 的一部分。
CREATE TABLE ultimo_resgistro WITH (KAFKA_TOPIC='SENSOR_DATA_REPARTITIONED', VALUE_FORMAT='AVRO');
table的key隐式取自ROWKEY
,是Kafka消息的key。
ksql> SELECT ROWKEY, CITY, SENSORID, TEMPERATURE FROM ULTIMO_RESGISTRO EMIT CHANGES;
+------------------+----------+----------+-------------+
|ROWKEY |CITY |SENSORID |TEMPERATURE |
+------------------+----------+----------+-------------+
|Madridsensor03 |Madrid |sensor03 |5.0 |
|Sevillasensor03 |Sevilla |sensor03 |20.0 |
|Sevillasensor01 |Sevilla |sensor01 |5.0 |
|Sevillasensor02 |Sevilla |sensor02 |5.0 |
|Valenciasensor02 |Valencia |sensor02 |15.0 |
|Valenciasensor03 |Valencia |sensor03 |15.0 |
如果您想利用拉取查询(为了获得最新值),那么您需要去投票(或贡献 PR)this issue。