我是否正确配置了 message 和 ksql 键以确保我的最终聚合是正确的?
Am I configuring message and ksql keys correctly to ensure my final aggregation is correct?
我有一个由 JDBC 连接器填充的主题。好像没有kafka message key:
ksql> print 'mssql-transaction-log' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"TransID": 8789405114, "UserID": 15, "ActionCode": 80, "GameName": "thisgame", "GameID": 148362, "DataCashRef": null, "Success": "Y", "StartBalance": 188036, "Amount": -25, "EndBalance": 188011, "BonusSta
rtBalance": 10000, "BonusAmount": 0, "BonusEndBalance": 10000, "Stamp": 1583162921467, "SiteID": 6}
我已经从中创建了一个流:
CREATE STREAM TRANSACTIONS_LOG_RAW
(
TRANSID BIGINT,
USERID INTEGER,
ACTIONCODE INTEGER,
GAMENAME STRING,
GAMEID BIGINT,
DATACASHREF STRING,
SUCCESS STRING,
STARTBALANCE INTEGER,
AMOUNT INTEGER,
ENDBALANCE INTEGER,
BONUSSTARTBALANCE INTEGER,
BONUSAMOUNT INTEGER,
BONUSENDBALANCE INTEGER,
STAMP BIGINT,
SITEID INTEGER
)
WITH (KAFKA_TOPIC='mssql-transaction-log',
VALUE_FORMAT='AVRO',
KEY='USERID');
我已经从这个创建了一个过滤流:
CREATE STREAM GAME_PURCHASES_RAW AS
SELECT USERID,
GAMENAME,
AMOUNT,
STAMP,
TIMESTAMPTOSTRING(STAMP, 'yyyyMMddHH') HOUR_DIMENSION,
TIMESTAMPTOSTRING(STAMP, 'yyyyMMdd') DAY_DIMENSION
FROM TRANSACTIONS_LOG_RAW
WHERE ACTIONCODE = 80
PARTITION BY USERID;
当我检查这些消息时,没有 kafka 密钥:
ksql> print 'GAME_PURCHASES_RAW' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"USERID": 58, "GAMENAME": "game", "AMOUNT": -50, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 191, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 70, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898980, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
当我描述流时,它显示了一个键:
ksql> describe GAME_PURCHASES_RAW;
Name : GAME_PURCHASES_RAW
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
USERID | INTEGER (key)
GAMENAME | VARCHAR(STRING)
AMOUNT | INTEGER
STAMP | BIGINT
HOUR_DIMENSION | VARCHAR(STRING)
DAY_DIMENSION | VARCHAR(STRING)
--------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
我将根据 USERID 从这个 GAME_PURCHASES_RAW 流分组创建一个聚合。我想到了聚合,kafka 消息键不能为空,因为我需要在同一分区上为每个唯一的 USERID 保留消息。
为什么 GAME_PURCHASES_RAW 的流不在其创建的主题的 kafka 消息中显示密钥?
我是否正确配置消息和 ksql 密钥以确保我的最终聚合正确?
(我怀疑我对 kafka 密钥与 ksql 流密钥的理解在某些基础层面上缺乏)
问题是您在 TRANSACTIONS_LOG_RAW
的 WITH
子句中设置了 KEY='USERID'
。删除它,它将起作用。
设置KEY
通知KSQL指定列中的数据与行键中的数据相同。但这种情况并非如此!原始日志中的键是 null
.
当您稍后 PARTITION BY USERID
时,ksqlDB 不会重新分区您的数据并设置密钥,正如您告诉它的那样,数据已经按 USERID
分区。
您不是唯一犯此错误的人。在 WITH
子句中使用 KEY
是造成混淆的常见原因,这就是我们在以后的版本中将其删除的原因。你那里的版本很旧。自该版本以来,发生了很多变化和改进。我建议升级!
我有一个由 JDBC 连接器填充的主题。好像没有kafka message key:
ksql> print 'mssql-transaction-log' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"TransID": 8789405114, "UserID": 15, "ActionCode": 80, "GameName": "thisgame", "GameID": 148362, "DataCashRef": null, "Success": "Y", "StartBalance": 188036, "Amount": -25, "EndBalance": 188011, "BonusSta
rtBalance": 10000, "BonusAmount": 0, "BonusEndBalance": 10000, "Stamp": 1583162921467, "SiteID": 6}
我已经从中创建了一个流:
CREATE STREAM TRANSACTIONS_LOG_RAW
(
TRANSID BIGINT,
USERID INTEGER,
ACTIONCODE INTEGER,
GAMENAME STRING,
GAMEID BIGINT,
DATACASHREF STRING,
SUCCESS STRING,
STARTBALANCE INTEGER,
AMOUNT INTEGER,
ENDBALANCE INTEGER,
BONUSSTARTBALANCE INTEGER,
BONUSAMOUNT INTEGER,
BONUSENDBALANCE INTEGER,
STAMP BIGINT,
SITEID INTEGER
)
WITH (KAFKA_TOPIC='mssql-transaction-log',
VALUE_FORMAT='AVRO',
KEY='USERID');
我已经从这个创建了一个过滤流:
CREATE STREAM GAME_PURCHASES_RAW AS
SELECT USERID,
GAMENAME,
AMOUNT,
STAMP,
TIMESTAMPTOSTRING(STAMP, 'yyyyMMddHH') HOUR_DIMENSION,
TIMESTAMPTOSTRING(STAMP, 'yyyyMMdd') DAY_DIMENSION
FROM TRANSACTIONS_LOG_RAW
WHERE ACTIONCODE = 80
PARTITION BY USERID;
当我检查这些消息时,没有 kafka 密钥:
ksql> print 'GAME_PURCHASES_RAW' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"USERID": 58, "GAMENAME": "game", "AMOUNT": -50, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 191, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 70, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898980, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
当我描述流时,它显示了一个键:
ksql> describe GAME_PURCHASES_RAW;
Name : GAME_PURCHASES_RAW
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
USERID | INTEGER (key)
GAMENAME | VARCHAR(STRING)
AMOUNT | INTEGER
STAMP | BIGINT
HOUR_DIMENSION | VARCHAR(STRING)
DAY_DIMENSION | VARCHAR(STRING)
--------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
我将根据 USERID 从这个 GAME_PURCHASES_RAW 流分组创建一个聚合。我想到了聚合,kafka 消息键不能为空,因为我需要在同一分区上为每个唯一的 USERID 保留消息。
为什么 GAME_PURCHASES_RAW 的流不在其创建的主题的 kafka 消息中显示密钥?
我是否正确配置消息和 ksql 密钥以确保我的最终聚合正确?
(我怀疑我对 kafka 密钥与 ksql 流密钥的理解在某些基础层面上缺乏)
问题是您在 TRANSACTIONS_LOG_RAW
的 WITH
子句中设置了 KEY='USERID'
。删除它,它将起作用。
设置KEY
通知KSQL指定列中的数据与行键中的数据相同。但这种情况并非如此!原始日志中的键是 null
.
当您稍后 PARTITION BY USERID
时,ksqlDB 不会重新分区您的数据并设置密钥,正如您告诉它的那样,数据已经按 USERID
分区。
您不是唯一犯此错误的人。在 WITH
子句中使用 KEY
是造成混淆的常见原因,这就是我们在以后的版本中将其删除的原因。你那里的版本很旧。自该版本以来,发生了很多变化和改进。我建议升级!