Kafka Streams - 更新 KTable 上的聚合

Kafka Streams - updating aggregations on KTable

我有一个 KTable,其中的数据看起来像这样(键 => 值),其中键是客户 ID,值是包含一些小的 JSON 对象客户数据:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我想在这个 KTable 上做一些聚合,基本上对每个 age_group 的记录数进行计数。所需的 KTable 数据如下所示:

"18-24" => 3
"25-30" => 1

假设 Alice 属于上面的 18-24 组,她的生日属于新的年龄组。支持第一个 KTable 的状态存储现在应该如下所示:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我希望生成的聚合 KTable 结果能够反映这一点。例如

"18-24" => 2
"25-30" => 2

可能 对描述的问题过于笼统 :

In Kafka Streams there is no such thing as a final aggregation... Depending on your use case, manual de-duplication would be a way to resolve the issue"

但到目前为止我只能计算出 运行 总数,例如爱丽丝的生日将被解释为:

"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well

编辑:这是我注意到的一些额外行为,似乎出乎意料。

我使用的拓扑结构如下:

dataKTable = builder.table("compacted-topic-1", "users-json")
    .groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
    .count("age-range-counts")

1) 空状态

现在,从最初的空状态开始,一切看起来像这样:

compacted-topic-1
(empty)


dataKTable
(empty)


// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)

// count()
age-range-counts state store
(empty)

2) 发送几条消息

现在,让我们向 compacted-topic-1 发送一条消息,该消息在上面作为 KTable 流式传输。这是发生的事情:

compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }


// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4

// count()
age-range-counts state store
18-24 => 0

所以我想知道:

如果您的原始 KTable 包含 id -> Json 数据(我们称之为 dataKTable),您应该能够通过

获得您想要的内容
KTable countKTablePerRange
    = dataKTable.groupBy(/* map your age-range to be the key*/)
                .count("someStoreName");

这应该适用于所有版本的 Kafka Streams API。

更新

关于重新分区主题中的4个值:是的。 "base KTable" 的每次更新都会为其 "old value" 和 "new value" 写入一条记录。这是正确更新下游 KTable 所必需的。必须从一个计数中删除旧值,并且必须将新值添加到另一个计数中。因为您的 (count) KTable 可能是分布式的(即,在多个并行的 运行ning 应用程序实例上共享),两条记录(旧的和新的)可能会在不同的实例中结束,因为它们可能具有不同的键和因此它们必须作为两个独立的记录发送。 (虽然记录格式应该比您在问题中显示的更复杂。)

这也解释了为什么需要减法器和加法器。减法器从聚合结果中删除旧记录,而加法器将新记录添加到聚合结果中。

仍然不确定为什么您在结果中看不到正确的计数。给你实例化了多少运行?也许尝试通过在 StreamsConfig.

中设置 cache.max.bytes.buffering=0 来禁用 KTable 缓存