有人有 KSQL 查询可以按小时计算主题中的事件吗?

Does anybody have a KSQL query that counts event in a topic on a per hour basis?

我是 KSQL 的新手,我正在尝试获取主题组中每小时的事件计数。如果不是,我会满足于计算该主题中的事件。然后我可以更改查询以在窗口基础上工作。时间戳是 为了提供更多上下文,我们假设我的主题称为 Messenger,并且事件采用 JSON 格式。这是一条示例消息:

{"name":"Won","message":"This message is from Won","ets":1642703358124}
Partition:0 Offset:69 Timestamp:1642703359427

首先创建一个关于主题的流:

CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR)
  WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');

然后使用 TUMBLING window 聚合和虚拟 GROUP BY 字段:

SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London')
         AS WINDOW_START_TS,
       COUNT(*) AS RECORD_CT
  FROM my_stream
        WINDOW TUMBLING (SIZE 1 HOURS)
  GROUP BY 1
  EMIT CHANGES;

如果您想覆盖从消息时间戳和 use a custom timestamp field 中获取的时间戳(我可以在您的示例中看到 ets),您可以在流定义中这样做:

CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR, ETS BIGINT)
  WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON', TIMESTAMP='ets');

参考:https://rmoff.net/2020/09/08/counting-the-number-of-messages-in-a-kafka-topic/