KSQLDB 推送查询不会发出所有更改
KSQLDB Push query doesn't emit all changes
我是 ksqlDB 的新手,我开始尝试非常基本的聚合。我创建了主题,我每秒收到很少的消息,这些消息被流式传输然后输出到具体化的 table.
我可以看到我的流与主题是最新的,但物化 table 不会输出每个更改。我在文档中找不到任何地方如何发出每个更新,而不仅仅是每 1 或 2 秒一次。
这是我的设置:
主题(JSON):
key: username, value: { balance_change }
流:
create stream balance_stream (user varchar key, balance_change bigint)
with (kafka_topic='balance', value_format='JSON');
具体化Table:
create table balance_table as
select user,
sum(balance_change) balance
from balance_stream
group by user
emit changes;
正如您在下面的视频中看到的,我的用户 bob 获得总余额的速度非常慢:
https://www.youtube.com/watch?v=0HmCA3ueUo0
如何从我的 balance_table
获取所有更新?
table 中的消息少于 900 条,应立即对所有值求和。
Name : BALANCE_TABLE
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : BALANCE_TABLE (partitions: 1, replication: 1)
Statement : CREATE TABLE BALANCE_TABLE WITH (KAFKA_TOPIC='BALANCE_TABLE', PARTITIONS=1, REPLICAS=1) AS SELECT
BALANCE_STREAM.USER USER,
SUM(BALANCE_STREAM.BALANCE_CHANGE) BALANCE
FROM BALANCE_STREAM BALANCE_STREAM
GROUP BY BALANCE_STREAM.USER
EMIT CHANGES;
Field | Type
------------------------------------------
USER | VARCHAR(STRING) (primary key)
BALANCE | BIGINT
------------------------------------------
Queries that write from this TABLE
-----------------------------------
CTAS_BALANCE_TABLE_155 (RUNNING) : CREATE TABLE BALANCE_TABLE WITH (KAFKA_TOPIC='BALANCE_TABLE', PARTITIONS=1, REPLICAS=1) AS SELECT BALANCE_STREAM.USER USER, SUM(BALANCE_STREAM.BALANCE_CHANGE) BALANCE FROM BALANCE_STREAM BALANCE_STREAM GROUP BY BALANCE_STREAM.USER EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.28 total-messages: 814 last-message: 2021-06-17T16:08:43.091Z
(Statistics of the local KSQL server interaction with the Kafka topic BALANCE_TABLE)
Consumer Groups summary:
Consumer Group : _confluent-ksql-ksqldbquery_CTAS_BALANCE_TABLE_155
Kafka topic : balance
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 21230 | 21230 | 0
------------------------------------------------------
问题出在默认服务器设置上
这可以通过调整 'commit.interval.ms'
的值来解决
我是 ksqlDB 的新手,我开始尝试非常基本的聚合。我创建了主题,我每秒收到很少的消息,这些消息被流式传输然后输出到具体化的 table.
我可以看到我的流与主题是最新的,但物化 table 不会输出每个更改。我在文档中找不到任何地方如何发出每个更新,而不仅仅是每 1 或 2 秒一次。
这是我的设置:
主题(JSON):
key: username, value: { balance_change }
流:
create stream balance_stream (user varchar key, balance_change bigint)
with (kafka_topic='balance', value_format='JSON');
具体化Table:
create table balance_table as
select user,
sum(balance_change) balance
from balance_stream
group by user
emit changes;
正如您在下面的视频中看到的,我的用户 bob 获得总余额的速度非常慢:
https://www.youtube.com/watch?v=0HmCA3ueUo0
如何从我的 balance_table
获取所有更新?
table 中的消息少于 900 条,应立即对所有值求和。
Name : BALANCE_TABLE
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : BALANCE_TABLE (partitions: 1, replication: 1)
Statement : CREATE TABLE BALANCE_TABLE WITH (KAFKA_TOPIC='BALANCE_TABLE', PARTITIONS=1, REPLICAS=1) AS SELECT
BALANCE_STREAM.USER USER,
SUM(BALANCE_STREAM.BALANCE_CHANGE) BALANCE
FROM BALANCE_STREAM BALANCE_STREAM
GROUP BY BALANCE_STREAM.USER
EMIT CHANGES;
Field | Type
------------------------------------------
USER | VARCHAR(STRING) (primary key)
BALANCE | BIGINT
------------------------------------------
Queries that write from this TABLE
-----------------------------------
CTAS_BALANCE_TABLE_155 (RUNNING) : CREATE TABLE BALANCE_TABLE WITH (KAFKA_TOPIC='BALANCE_TABLE', PARTITIONS=1, REPLICAS=1) AS SELECT BALANCE_STREAM.USER USER, SUM(BALANCE_STREAM.BALANCE_CHANGE) BALANCE FROM BALANCE_STREAM BALANCE_STREAM GROUP BY BALANCE_STREAM.USER EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.28 total-messages: 814 last-message: 2021-06-17T16:08:43.091Z
(Statistics of the local KSQL server interaction with the Kafka topic BALANCE_TABLE)
Consumer Groups summary:
Consumer Group : _confluent-ksql-ksqldbquery_CTAS_BALANCE_TABLE_155
Kafka topic : balance
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 21230 | 21230 | 0
------------------------------------------------------
问题出在默认服务器设置上
这可以通过调整 'commit.interval.ms'