KSQL 窗口聚合流
KSQL Windowed Aggregation Stream
我正在尝试使用 KSQL Windowed Aggregation, specifically the Session Window.
按其属性之一并随着时间的推移对事件进行分组
我有一个 STREAM
由一个 kafka 主题制成,TIMESTAMP
属性 明确指定。
当我尝试使用会话窗口创建 STREAM
时,查询如下:
CREATE STREAM SESSION_STREAM AS
SELECT ...
FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
GROUP BY ...;
我总是得到错误:
Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.
是否可以使用窗口聚合创建 STREAM
?
当我尝试按照建议创建一个 TABLE
,然后创建一个包含所有会话开始事件的 STREAM
时,查询如下:
CREATE STREAM SESSION_START_STREAM AS
SELECT *
FROM SESSION_TABLE
WHERE WINDOWSTART=WINDOWEND;
KSQL 告诉我:
KSQL does not support persistent queries on windowed tables
如何在 KSQL 中创建 STREAM
开始会话 window 的事件?
您的创建流语句,如果切换到创建 table 语句,将创建一个不断更新的 table。接收器主题 SESSION_STREAM
将包含对 table 的更改流,即它的更改日志。
ksqlDB 将其建模为 TABLE,因为它具有 TABLE 语义,即在具有任何特定键的 table 中只能存在一行。但是,更改日志将包含已应用于 table.
的更改流
如果您想要的是包含所有会话的主题,那么将创建这样的主题:
-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT)
WITH (kafka_topic='data', value_format='json');
-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
FROM DATA
WINDOW SESSION (5 SECONDS)
GROUP BY USER_ID;
这将创建一个 SESSIONS
主题,其中包含对 SESSIONS
table 的更改:即它的更改日志。
如果您想将其转换为会话启动事件流,那么不幸的是 ksqlDB 还 不允许您直接更改从 [=31= 创建流],但您可以在 table 的更改日志上创建流:
-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS
SELECT * FROM SESSION_STREAM
WHERE WINDOWSTART = WINDOWEND;
请注意,在即将发布的 0.10 版本中,您将能够正确命名 SESSION_STREAM
中的键列:
CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
我正在尝试使用 KSQL Windowed Aggregation, specifically the Session Window.
按其属性之一并随着时间的推移对事件进行分组我有一个 STREAM
由一个 kafka 主题制成,TIMESTAMP
属性 明确指定。
当我尝试使用会话窗口创建 STREAM
时,查询如下:
CREATE STREAM SESSION_STREAM AS
SELECT ...
FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
GROUP BY ...;
我总是得到错误:
Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.
是否可以使用窗口聚合创建 STREAM
?
当我尝试按照建议创建一个 TABLE
,然后创建一个包含所有会话开始事件的 STREAM
时,查询如下:
CREATE STREAM SESSION_START_STREAM AS
SELECT *
FROM SESSION_TABLE
WHERE WINDOWSTART=WINDOWEND;
KSQL 告诉我:
KSQL does not support persistent queries on windowed tables
如何在 KSQL 中创建 STREAM
开始会话 window 的事件?
您的创建流语句,如果切换到创建 table 语句,将创建一个不断更新的 table。接收器主题 SESSION_STREAM
将包含对 table 的更改流,即它的更改日志。
ksqlDB 将其建模为 TABLE,因为它具有 TABLE 语义,即在具有任何特定键的 table 中只能存在一行。但是,更改日志将包含已应用于 table.
的更改流如果您想要的是包含所有会话的主题,那么将创建这样的主题:
-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT)
WITH (kafka_topic='data', value_format='json');
-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
FROM DATA
WINDOW SESSION (5 SECONDS)
GROUP BY USER_ID;
这将创建一个 SESSIONS
主题,其中包含对 SESSIONS
table 的更改:即它的更改日志。
如果您想将其转换为会话启动事件流,那么不幸的是 ksqlDB 还 不允许您直接更改从 [=31= 创建流],但您可以在 table 的更改日志上创建流:
-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS
SELECT * FROM SESSION_STREAM
WHERE WINDOWSTART = WINDOWEND;
请注意,在即将发布的 0.10 版本中,您将能够正确命名 SESSION_STREAM
中的键列:
CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');