从 ksqlDB 中的流创建 Table 中串联的 Rowkey

Rowkey as Concatenated in Create Table from a Stream in ksqlDB

流是:

CREATE STREAM SENSORS_KSTREAM (sensorid INT,
  serialnumber VARCHAR,
  mfgdate VARCHAR,
  productname VARCHAR,
  customerid INT,
  locationid INT,
  macaddress VARCHAR,
  installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');

我用这个创建的 table 是:

CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate, COUNT(*) AS TOTAL 
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES) 
GROUP BY sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate;

生成的ROWKEY不是我想要的

我只想 SENSORID 作为 rowkey。

谁能帮我做这个。

提前致谢。

PS: 我正在使用 Confluent 5.4.0 独立版。

ksqlDB在底层Kafka消息的key中存储了一个table的主键。这对于确保相同键的一致分区分配和日志压缩等重要事情至关重要。

ksqlDB 不支持复合键,尽管这是一项正在开发的功能。因此,与此同时,当您按多列分组时,ksqlDB 会尽其所能并构建您遇到的复合键。不是很好,但它实际上适用于许多用例。

您上面的语句正在创建一个 table,在主键中包含许多列 - 目前它们都被序列化为单个 STRING 值。

您要求仅在键中包含 SENSORID...但是您的 GROUP BY 子句使所有列都位于键的一部分之后。

在我看来,您的主题包含传感器的更新值流。在这种情况下,我建议研究两种选择:

  1. 如果输入主题中的每一行都包含每个传感器的所有数据,那么为什么不将其导入为 TABLE 而不是 STREAM:
CREATE TABLE SENSORS_KSTREAM (sensorid INT,
  serialnumber VARCHAR,
  mfgdate VARCHAR,
  productname VARCHAR,
  customerid INT,
  locationid INT,
  macaddress VARCHAR,
  installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');
  1. 或者,也许 LATEST_BY_OFFSET 可用于捕获每列的最新值:
CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, LATEST_BY_OFFSET(serialnumber), LATEST_BY_OFFSET(mfgdate), LATEST_BY_OFFSET(productname), LATEST_BY_OFFSET(customerid), LATEST_BY_OFFSET(locationid), LATEST_BY_OFFSET(macaddress), LATEST_BY_OFFSET(installationdate) 
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES) 
GROUP BY sensorid;

LAST_BY_OFFSET 仅在几个版本前推出,因此您可能需要更新。

希望这两个选项能帮助您到达目的地。