有人有 KSQL 查询可以按小时计算主题中的事件吗?
Does anybody have a KSQL query that counts event in a topic on a per hour basis?
我是 KSQL 的新手,我正在尝试获取主题组中每小时的事件计数。如果不是,我会满足于计算该主题中的事件。然后我可以更改查询以在窗口基础上工作。时间戳是
为了提供更多上下文,我们假设我的主题称为 Messenger,并且事件采用 JSON 格式。这是一条示例消息:
{"name":"Won","message":"This message is from Won","ets":1642703358124}
Partition:0 Offset:69 Timestamp:1642703359427
首先创建一个关于主题的流:
CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR)
WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');
然后使用 TUMBLING
window 聚合和虚拟 GROUP BY
字段:
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London')
AS WINDOW_START_TS,
COUNT(*) AS RECORD_CT
FROM my_stream
WINDOW TUMBLING (SIZE 1 HOURS)
GROUP BY 1
EMIT CHANGES;
如果您想覆盖从消息时间戳和 use a custom timestamp field 中获取的时间戳(我可以在您的示例中看到 ets
),您可以在流定义中这样做:
CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR, ETS BIGINT)
WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON', TIMESTAMP='ets');
参考:https://rmoff.net/2020/09/08/counting-the-number-of-messages-in-a-kafka-topic/
我是 KSQL 的新手,我正在尝试获取主题组中每小时的事件计数。如果不是,我会满足于计算该主题中的事件。然后我可以更改查询以在窗口基础上工作。时间戳是 为了提供更多上下文,我们假设我的主题称为 Messenger,并且事件采用 JSON 格式。这是一条示例消息:
{"name":"Won","message":"This message is from Won","ets":1642703358124}
Partition:0 Offset:69 Timestamp:1642703359427
首先创建一个关于主题的流:
CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR)
WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');
然后使用 TUMBLING
window 聚合和虚拟 GROUP BY
字段:
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London')
AS WINDOW_START_TS,
COUNT(*) AS RECORD_CT
FROM my_stream
WINDOW TUMBLING (SIZE 1 HOURS)
GROUP BY 1
EMIT CHANGES;
如果您想覆盖从消息时间戳和 use a custom timestamp field 中获取的时间戳(我可以在您的示例中看到 ets
),您可以在流定义中这样做:
CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR, ETS BIGINT)
WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON', TIMESTAMP='ets');
参考:https://rmoff.net/2020/09/08/counting-the-number-of-messages-in-a-kafka-topic/