我是否正确配置了 message 和 ksql 键以确保我的最终聚合是正确的?

Am I configuring message and ksql keys correctly to ensure my final aggregation is correct?

我有一个由 JDBC 连接器填充的主题。好像没有kafka message key:

ksql> print 'mssql-transaction-log' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"TransID": 8789405114, "UserID": 15, "ActionCode": 80, "GameName": "thisgame", "GameID": 148362, "DataCashRef": null, "Success": "Y", "StartBalance": 188036, "Amount": -25, "EndBalance": 188011, "BonusSta
rtBalance": 10000, "BonusAmount": 0, "BonusEndBalance": 10000, "Stamp": 1583162921467, "SiteID": 6}

我已经从中创建了一个流:

CREATE STREAM TRANSACTIONS_LOG_RAW
   (
      TRANSID BIGINT,
      USERID INTEGER,
      ACTIONCODE INTEGER,
      GAMENAME STRING,
      GAMEID BIGINT,
      DATACASHREF STRING,
      SUCCESS STRING,
      STARTBALANCE INTEGER,
      AMOUNT INTEGER,
      ENDBALANCE INTEGER,
      BONUSSTARTBALANCE INTEGER,
      BONUSAMOUNT INTEGER,
      BONUSENDBALANCE INTEGER,
      STAMP BIGINT,
      SITEID INTEGER
  )
  WITH (KAFKA_TOPIC='mssql-transaction-log',
    VALUE_FORMAT='AVRO',
    KEY='USERID');

我已经从这个创建了一个过滤流:

CREATE STREAM GAME_PURCHASES_RAW AS
    SELECT USERID,
    GAMENAME,
    AMOUNT,
    STAMP,
    TIMESTAMPTOSTRING(STAMP, 'yyyyMMddHH') HOUR_DIMENSION,
    TIMESTAMPTOSTRING(STAMP, 'yyyyMMdd') DAY_DIMENSION
    FROM TRANSACTIONS_LOG_RAW
    WHERE ACTIONCODE = 80
    PARTITION BY USERID;

当我检查这些消息时,没有 kafka 密钥:

ksql> print 'GAME_PURCHASES_RAW' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"USERID": 58, "GAMENAME": "game", "AMOUNT": -50, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 191, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 70, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898980, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}

当我描述流时,它显示了一个键:

ksql> describe GAME_PURCHASES_RAW;

Name                 : GAME_PURCHASES_RAW
 Field          | Type
--------------------------------------------
 ROWTIME        | BIGINT           (system)
 ROWKEY         | VARCHAR(STRING)  (system)
 USERID         | INTEGER          (key)
 GAMENAME       | VARCHAR(STRING)
 AMOUNT         | INTEGER
 STAMP          | BIGINT
 HOUR_DIMENSION | VARCHAR(STRING)
 DAY_DIMENSION  | VARCHAR(STRING)
--------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

我将根据 USERID 从这个 GAME_PURCHASES_RAW 流分组创建一个聚合。我想到了聚合,kafka 消息键不能为空,因为我需要在同一分区上为每个唯一的 USERID 保留消息。

为什么 GAME_PURCHASES_RAW 的流不在其创建的主题的 kafka 消息中显示密钥?

我是否正确配置消息和 ksql 密钥以确保我的最终聚合正确?

(我怀疑我对 kafka 密钥与 ksql 流密钥的理解在某些基础层面上缺乏)

问题是您在 TRANSACTIONS_LOG_RAWWITH 子句中设置了 KEY='USERID'。删除它,它将起作用。

设置KEY通知KSQL指定列中的数据与行键中的数据相同。但这种情况并非如此!原始日志中的键是 null.

当您稍后 PARTITION BY USERID 时,ksqlDB 不会重新分区您的数据并设置密钥,正如您告诉它的那样,数据已经按 USERID 分区。

您不是唯一犯此错误的人。在 WITH 子句中使用 KEY 是造成混淆的常见原因,这就是我们在以后的版本中将其删除的原因。你那里的版本很旧。自该版本以来,发生了很多变化和改进。我建议升级!