KSQL 表是否应该为聚合的每个键显示多行?

Should KSQL tables be showing multiple rows per key for aggregates?

我对 KSQL tables 的理解是它们显示了我们数据的“原样”视图,而不是所有数据。 因此,如果我有一个简单的聚合查询并且我从我的 table SELECT,我应该及时看到数据,因为它是在这个时间点。

我的数据(流):

MY_TOPIC_STREAM:

15 | BEACH  | Steven Ebb    | over there
24 | CIRCUS | John Doe      | an adress
30 | CIRCUS | Alice Small   | another address
35 | CIRCUS | Barry Share   | a home
35 | CIRCUS | Garry Share   | a home
40 | CIRCUS | John Mee      | somewhere
45 | CIRCUS | David Three   | a place
45 | CIRCUS | Mary Three    | a place
45 | CIRCUS | Joffrey Three | a place

我的table定义:

CREATE TABLE MY_TABLE WITH (VALUE_FORMAT='AVRO') AS 
  SELECT ROWKEY AS APPLICATION, COUNT(*) AS NUM_APPLICANTS 
  FROM MY_TOPIC_STREAM
  WHERE header->eventType = 'CIRCUS' 
  GROUP BY ROWKEY;

我很困惑为什么我在 table 中看到多行,即使最终的聚合是正确的?

    SELECT * FROM MY_TABLE;

    APPLICATION       NUM_APPLICANTS
    24                1
    30                1
--> 35                1 <-- why do I see this?
    35                2
    40                1
--> 45                1 <-- why do I see this?
--> 45                2 <-- why do I see this?
    45                3

我的接收器主题也显示了与 table 输出相同的信息 - 大概这是正确的?

我预计我的 table 结果是:

    APPLICATION       NUM_APPLICANTS
    24                1
    30                1
    35                2
    40                1
    45                3

为了简洁和可读性,上面的输出被删节了,但你明白了要点。

所以 - 我对 table 和接收器主题输出的期望是否符合要求?

更新 下面的 Matthias 回答正确地解释了 table 和 sink 主题显示变更日志事件,因此看到中间值是正常的。然而,令我困惑的是我看到了 all 中间行。原来这是因为我使用的是 confluent 5.2.1 docker-compose,它设置了环境变量 KSQL_STREAMS_CACHE_MAX_BYTES_BUFFERING=0。这会禁用 KSQL 聚合中所有中间结果的缓存,因此 table 显示比预期更多的行,同时最终到达正确的聚合。将其设置为例如10MB 导致数据按预期输出。对于那些开始使用 KSQL 并使用 docker 建立实例的人来说,这个特性在文档中并不是很明显! This issue pointed me in the right direction, and this page 记录参数。我在这上面花了很长时间,无法弄清楚为什么它没有按预期运行!我希望这对某人有所帮助。

不确定您使用的是什么版本,但是,SELECT * FROM MY_TABLE;不是returntable的当前内容,但是table 的 changelog 流(这适用于旧版本;在新版本中,您显示的查询无效,因为语法已更改)。

自从从 KSQL 过渡到 ksqlDB,您显示的查询将被称为 推送查询,表示为 SELECT * FROM my_table EMIT CHANGES;

此外,ksqlDB 引入了拉取查询,允许您查找当前状态。但是 SELECT * FROM my_table; 还不支持作为拉取查询(将来会添加)。您只能对特定键进行 table 查找,即此时必须有一个 WHERE 子句。

查看文档了解更多详情:https://docs.ksqldb.io/en/latest/concepts/queries/pull/