从 ksqlDB 中的流创建 Table 中串联的 Rowkey
Rowkey as Concatenated in Create Table from a Stream in ksqlDB
流是:
CREATE STREAM SENSORS_KSTREAM (sensorid INT,
serialnumber VARCHAR,
mfgdate VARCHAR,
productname VARCHAR,
customerid INT,
locationid INT,
macaddress VARCHAR,
installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');
我用这个创建的 table 是:
CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate, COUNT(*) AS TOTAL
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate;
生成的ROWKEY不是我想要的
我只想 SENSORID 作为 rowkey。
谁能帮我做这个。
提前致谢。
PS:
我正在使用 Confluent 5.4.0 独立版。
ksqlDB在底层Kafka消息的key中存储了一个table的主键。这对于确保相同键的一致分区分配和日志压缩等重要事情至关重要。
ksqlDB 不支持复合键,尽管这是一项正在开发的功能。因此,与此同时,当您按多列分组时,ksqlDB 会尽其所能并构建您遇到的复合键。不是很好,但它实际上适用于许多用例。
您上面的语句正在创建一个 table,在主键中包含许多列 - 目前它们都被序列化为单个 STRING 值。
您要求仅在键中包含 SENSORID
...但是您的 GROUP BY 子句使所有列都位于键的一部分之后。
在我看来,您的主题包含传感器的更新值流。在这种情况下,我建议研究两种选择:
- 如果输入主题中的每一行都包含每个传感器的所有数据,那么为什么不将其导入为 TABLE 而不是 STREAM:
CREATE TABLE SENSORS_KSTREAM (sensorid INT,
serialnumber VARCHAR,
mfgdate VARCHAR,
productname VARCHAR,
customerid INT,
locationid INT,
macaddress VARCHAR,
installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');
- 或者,也许
LATEST_BY_OFFSET
可用于捕获每列的最新值:
CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, LATEST_BY_OFFSET(serialnumber), LATEST_BY_OFFSET(mfgdate), LATEST_BY_OFFSET(productname), LATEST_BY_OFFSET(customerid), LATEST_BY_OFFSET(locationid), LATEST_BY_OFFSET(macaddress), LATEST_BY_OFFSET(installationdate)
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY sensorid;
LAST_BY_OFFSET 仅在几个版本前推出,因此您可能需要更新。
希望这两个选项能帮助您到达目的地。
流是:
CREATE STREAM SENSORS_KSTREAM (sensorid INT,
serialnumber VARCHAR,
mfgdate VARCHAR,
productname VARCHAR,
customerid INT,
locationid INT,
macaddress VARCHAR,
installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');
我用这个创建的 table 是:
CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate, COUNT(*) AS TOTAL
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate;
生成的ROWKEY不是我想要的
我只想 SENSORID 作为 rowkey。
谁能帮我做这个。
提前致谢。
PS: 我正在使用 Confluent 5.4.0 独立版。
ksqlDB在底层Kafka消息的key中存储了一个table的主键。这对于确保相同键的一致分区分配和日志压缩等重要事情至关重要。
ksqlDB 不支持复合键,尽管这是一项正在开发的功能。因此,与此同时,当您按多列分组时,ksqlDB 会尽其所能并构建您遇到的复合键。不是很好,但它实际上适用于许多用例。
您上面的语句正在创建一个 table,在主键中包含许多列 - 目前它们都被序列化为单个 STRING 值。
您要求仅在键中包含 SENSORID
...但是您的 GROUP BY 子句使所有列都位于键的一部分之后。
在我看来,您的主题包含传感器的更新值流。在这种情况下,我建议研究两种选择:
- 如果输入主题中的每一行都包含每个传感器的所有数据,那么为什么不将其导入为 TABLE 而不是 STREAM:
CREATE TABLE SENSORS_KSTREAM (sensorid INT,
serialnumber VARCHAR,
mfgdate VARCHAR,
productname VARCHAR,
customerid INT,
locationid INT,
macaddress VARCHAR,
installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');
- 或者,也许
LATEST_BY_OFFSET
可用于捕获每列的最新值:
CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, LATEST_BY_OFFSET(serialnumber), LATEST_BY_OFFSET(mfgdate), LATEST_BY_OFFSET(productname), LATEST_BY_OFFSET(customerid), LATEST_BY_OFFSET(locationid), LATEST_BY_OFFSET(macaddress), LATEST_BY_OFFSET(installationdate)
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY sensorid;
LAST_BY_OFFSET 仅在几个版本前推出,因此您可能需要更新。
希望这两个选项能帮助您到达目的地。