KSQL 查询在简单聚合中返回意外值
KSQL Query returning unexpected values in simple aggregation
我从针对本身由 Kafka 主题定义的 KTable 的 KSQL 查询中得到意外结果。 KTABLE 是 "Trades",它由压缩主题 "localhost.dbo.TradeHistory" 支持。它应该包含由 TradeId 键入的股票交易的最新信息。该主题的键是 TradeId。每笔交易都有一个 AccountId,我正在尝试构建一个查询以获取按账户分组的交易金额的总和。
交易KTABLE的定义
ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId');
...
ksql> describe extended Trades;
Name : TRADES
Type : TABLE
Key field : TRADEID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : localhost.dbo.TradeHistory (partitions: 1, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
TRADEID | INTEGER
ACCOUNTID | INTEGER
SPN | INTEGER
AMOUNT | DOUBLE
---------------------------------------
Local runtime statistics
------------------------
consumer-messages-per-sec: 0 consumer-total-bytes: 3709 consumer-total-messages: 39 last-message: 2019-10-12T20:52:16.552Z
(Statistics of the local KSQL server interaction with the Kafka topic localhost.dbo.TradeHistory)
localhost.dbo.TradeHistory 主题的配置
/usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --topic localhost.dbo.TradeHistory
Topic:localhost.dbo.TradeHistory PartitionCount:1 ReplicationFactor:1 Configs:min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,cleanup.policy=compact,segment.ms=100
Topic: localhost.dbo.TradeHistory Partition: 0 Leader: 1 Replicas: 1 Isr: 1
在我的测试中,我使用 TradeId 2 将消息添加到 localhost.dbo.TradeHistory 主题,这只会改变交易量。仅更新金额; AccountId 保持为 1.
localhost.dbo.TradeHistory 主题中的消息
/usr/bin/kafka-console-consumer --bootstrap-server broker:9092 --property print.key=true --topic localhost.dbo.TradeHistory --from-beginning
... (earlier values redacted) ...
2 {"TradeHistoryId":47,"TradeId":2,"AccountId":1,"Spn":1,"Amount":106.0,"__table":"TradeHistory"}
2 {"TradeHistoryId":48,"TradeId":2,"AccountId":1,"Spn":1,"Amount":107.0,"__table":"TradeHistory"}
上面的主题转储显示交易量 2(在帐户 1 中)从 106.0 变为 107.0。
KSQL 查询
ksql> select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId;
1 | 1 | 106.0
1 | 0 | 0.0
1 | 1 | 107.0
问题是,为什么每次我发布交易更新时,上面显示的 KSQL 查询 return 一个 "intermediate" 值。如您所见,Count 和 Amount 字段显示 0,0,然后 KSQL 立即查询 "corrects" 到 1,107.0。我对这种行为有点困惑。
谁能解释一下?
非常感谢。
感谢您的提问。我已经在我们的知识库中添加了一个答案:https://github.com/confluentinc/ksql/pull/3594/files.
当 KSQL 发现 table 中现有行的更新时,它会在内部发出一个 CDC 事件,其中包含旧值和新值。
聚合通过在应用新值之前首先撤消旧值来处理此问题。
因此,在上面的示例中,当发生第二次插入时,KSQL 首先撤消旧值。这导致 COUNT
下降 1,SUM
下降 106.0
的旧值,即下降到零。
然后 KSQL 应用新的行值,它看到 COUNT
上升 1,SUM
上升新值 107.0
.
默认情况下,KSQL 配置为在将结果刷新到 Kafka 之前缓冲 最多 2 秒或 10MB 数据的结果。这就是为什么在此示例中插入值时您可能会看到输出略有延迟。如果两个输出行一起缓冲,那么 KSQL 将抑制第一个结果。这就是为什么您经常看不到正在输出的中间行。配置 commit.interval.ms
和 cache.max.bytes.buffering
,分别设置为 2 秒和 10MB,可用于调整此行为。将这些设置中的任何一个设置为零都会导致 KSQL 始终输出所有中间结果。
如果您每次都看到这些中间结果输出,那么很可能您已将这些设置中的一个或两个设置为零。
我们有一个 Github issue 来增强 KSQL 以利用 Kafka Stream 的抑制功能,
这将允许用户更好地控制结果的具体化方式。
我从针对本身由 Kafka 主题定义的 KTable 的 KSQL 查询中得到意外结果。 KTABLE 是 "Trades",它由压缩主题 "localhost.dbo.TradeHistory" 支持。它应该包含由 TradeId 键入的股票交易的最新信息。该主题的键是 TradeId。每笔交易都有一个 AccountId,我正在尝试构建一个查询以获取按账户分组的交易金额的总和。
交易KTABLE的定义
ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId');
...
ksql> describe extended Trades;
Name : TRADES
Type : TABLE
Key field : TRADEID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : localhost.dbo.TradeHistory (partitions: 1, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
TRADEID | INTEGER
ACCOUNTID | INTEGER
SPN | INTEGER
AMOUNT | DOUBLE
---------------------------------------
Local runtime statistics
------------------------
consumer-messages-per-sec: 0 consumer-total-bytes: 3709 consumer-total-messages: 39 last-message: 2019-10-12T20:52:16.552Z
(Statistics of the local KSQL server interaction with the Kafka topic localhost.dbo.TradeHistory)
localhost.dbo.TradeHistory 主题的配置
/usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --topic localhost.dbo.TradeHistory
Topic:localhost.dbo.TradeHistory PartitionCount:1 ReplicationFactor:1 Configs:min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,cleanup.policy=compact,segment.ms=100
Topic: localhost.dbo.TradeHistory Partition: 0 Leader: 1 Replicas: 1 Isr: 1
在我的测试中,我使用 TradeId 2 将消息添加到 localhost.dbo.TradeHistory 主题,这只会改变交易量。仅更新金额; AccountId 保持为 1.
localhost.dbo.TradeHistory 主题中的消息
/usr/bin/kafka-console-consumer --bootstrap-server broker:9092 --property print.key=true --topic localhost.dbo.TradeHistory --from-beginning
... (earlier values redacted) ...
2 {"TradeHistoryId":47,"TradeId":2,"AccountId":1,"Spn":1,"Amount":106.0,"__table":"TradeHistory"}
2 {"TradeHistoryId":48,"TradeId":2,"AccountId":1,"Spn":1,"Amount":107.0,"__table":"TradeHistory"}
上面的主题转储显示交易量 2(在帐户 1 中)从 106.0 变为 107.0。
KSQL 查询
ksql> select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId;
1 | 1 | 106.0
1 | 0 | 0.0
1 | 1 | 107.0
问题是,为什么每次我发布交易更新时,上面显示的 KSQL 查询 return 一个 "intermediate" 值。如您所见,Count 和 Amount 字段显示 0,0,然后 KSQL 立即查询 "corrects" 到 1,107.0。我对这种行为有点困惑。
谁能解释一下?
非常感谢。
感谢您的提问。我已经在我们的知识库中添加了一个答案:https://github.com/confluentinc/ksql/pull/3594/files.
当 KSQL 发现 table 中现有行的更新时,它会在内部发出一个 CDC 事件,其中包含旧值和新值。 聚合通过在应用新值之前首先撤消旧值来处理此问题。
因此,在上面的示例中,当发生第二次插入时,KSQL 首先撤消旧值。这导致 COUNT
下降 1,SUM
下降 106.0
的旧值,即下降到零。
然后 KSQL 应用新的行值,它看到 COUNT
上升 1,SUM
上升新值 107.0
.
默认情况下,KSQL 配置为在将结果刷新到 Kafka 之前缓冲 最多 2 秒或 10MB 数据的结果。这就是为什么在此示例中插入值时您可能会看到输出略有延迟。如果两个输出行一起缓冲,那么 KSQL 将抑制第一个结果。这就是为什么您经常看不到正在输出的中间行。配置 commit.interval.ms
和 cache.max.bytes.buffering
,分别设置为 2 秒和 10MB,可用于调整此行为。将这些设置中的任何一个设置为零都会导致 KSQL 始终输出所有中间结果。
如果您每次都看到这些中间结果输出,那么很可能您已将这些设置中的一个或两个设置为零。
我们有一个 Github issue 来增强 KSQL 以利用 Kafka Stream 的抑制功能, 这将允许用户更好地控制结果的具体化方式。