KSQLDB 推送查询不会发出所有更改

KSQLDB Push query doesn't emit all changes

我是 ksqlDB 的新手,我开始尝试非常基本的聚合。我创建了主题,我每秒收到很少的消息,这些消息被流式传输然后输出到具体化的 table.

我可以看到我的流与主题是最新的,但物化 table 不会输出每个更改。我在文档中找不到任何地方如何发出每个更新,而不仅仅是每 1 或 2 秒一次。

这是我的设置:

主题(JSON):

key: username, value: { balance_change }

流:

create stream balance_stream (user varchar key, balance_change bigint) 
  with (kafka_topic='balance', value_format='JSON');

具体化Table:

create table balance_table as 
select user, 
       sum(balance_change) balance 
from balance_stream 
group by user 
emit changes;

正如您在下面的视频中看到的,我的用户 bob 获得总余额的速度非常慢:

https://www.youtube.com/watch?v=0HmCA3ueUo0

如何从我的 balance_table 获取所有更新?


table 中的消息少于 900 条,应立即对所有值求和。


Name                 : BALANCE_TABLE
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : BALANCE_TABLE (partitions: 1, replication: 1)
Statement            : CREATE TABLE BALANCE_TABLE WITH (KAFKA_TOPIC='BALANCE_TABLE', PARTITIONS=1, REPLICAS=1) AS SELECT
  BALANCE_STREAM.USER USER,
  SUM(BALANCE_STREAM.BALANCE_CHANGE) BALANCE
FROM BALANCE_STREAM BALANCE_STREAM
GROUP BY BALANCE_STREAM.USER
EMIT CHANGES;

 Field   | Type                           
------------------------------------------
 USER    | VARCHAR(STRING)  (primary key) 
 BALANCE | BIGINT                         
------------------------------------------

Queries that write from this TABLE
-----------------------------------
CTAS_BALANCE_TABLE_155 (RUNNING) : CREATE TABLE BALANCE_TABLE WITH (KAFKA_TOPIC='BALANCE_TABLE', PARTITIONS=1, REPLICAS=1) AS SELECT   BALANCE_STREAM.USER USER,   SUM(BALANCE_STREAM.BALANCE_CHANGE) BALANCE FROM BALANCE_STREAM BALANCE_STREAM GROUP BY BALANCE_STREAM.USER EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:      0.28   total-messages:       814     last-message: 2021-06-17T16:08:43.091Z

(Statistics of the local KSQL server interaction with the Kafka topic BALANCE_TABLE)

Consumer Groups summary:

Consumer Group       : _confluent-ksql-ksqldbquery_CTAS_BALANCE_TABLE_155

Kafka topic          : balance
Max lag              : 0

 Partition | Start Offset | End Offset | Offset | Lag 
------------------------------------------------------
 0         | 0            | 21230      | 21230  | 0   
------------------------------------------------------

问题出在默认服务器设置上

这可以通过调整 'commit.interval.ms'

的值来解决