TopologyTestDriver 在 KTable 聚合上发送不正确的消息
TopologyTestDriver sending incorrect message on KTable aggregations
我有一个聚合在 KTable 上的拓扑。
这是我创建的一种通用方法,用于根据我拥有的不同主题构建此拓扑。
public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}
这在使用 Kafka 时效果很好,但在通过“TopologyTestDriver”进行测试时效果不佳。
在这两种情况下,当我获得更新时,首先调用 subtractor
,然后调用 adder
。问题是当使用 TopologyTestDriver
时,会发送两条消息进行更新:一条在 subtractor
调用之后,另一条在 adder
调用之后。更不用说在 subrtractor
之后和 adder
之前发送的消息处于错误阶段。
还有其他人可以确认这是一个错误吗?我已经针对 Kafka 版本 2.0.1 和 2.1.0 进行了测试。
编辑:
我在 github 中创建了一个测试用例来说明这个问题:https://github.com/mulho/topology-testcase
有两个输出记录(一个 "minus" 记录和一个 "plus" 记录)是预期的行为。理解它是如何工作的有点棘手,所以让我试着解释一下。
假设您有以下输入 table:
key | value
-----+---------
A | <10,2>
B | <10,3>
C | <11,4>
在 KTable#groupBy()
上,您将值的第一部分提取为新键(即 10
或 11
),然后对第二部分求和(即 2
, 3
, 4
) 在聚合中。因为 A
和 B
记录都将 10
作为新密钥,所以您将对 2+3
求和,并对 4
求和以获得新密钥 11
.结果 table 将是:
key | value
-----+---------
10 | 5
11 | 4
现在假设更新记录 <B,<11,5>>
将原始输入 KTable 更改为:
key | value
-----+---------
A | <10,2>
B | <11,5>
C | <11,4>
因此,新结果 table 应该总结 5+4
for 11
和 2
for 10
:
key | value
-----+---------
10 | 2
11 | 9
如果将第一个结果 table 与第二个结果进行比较,您可能会注意到 两个 行都得到了更新。从 10|5
中减去旧的 B|<10,3>
记录,得到 10|2
,将新的 B|<11,5>
记录添加到 11|4
,得到 11|9
。
这正是您看到的两条输出记录。第一个输出记录(在执行减法之后)更新第一行(它减去不再属于聚合结果的旧值),而第二个记录将新值添加到聚合结果中。在我们的示例中,减去记录将是 <10,<null,<10,3>>>
,添加记录将是 <11,<<11,5>,null>>
(这些记录的格式是 <key, <plus,minus>>
(注意减去记录只设置 minus
部分,而添加记录只设置 plus
部分)。
最后说明:不能将正负记录放在一起,因为正负记录的key可以不同(在我们的例子中是11
和10
),并且因此可能会进入不同的分区。这意味着加号和减号操作可能由不同的机器执行,因此不可能只发出一条包含加号和减号部分的记录。
我有一个聚合在 KTable 上的拓扑。 这是我创建的一种通用方法,用于根据我拥有的不同主题构建此拓扑。
public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}
这在使用 Kafka 时效果很好,但在通过“TopologyTestDriver”进行测试时效果不佳。
在这两种情况下,当我获得更新时,首先调用 subtractor
,然后调用 adder
。问题是当使用 TopologyTestDriver
时,会发送两条消息进行更新:一条在 subtractor
调用之后,另一条在 adder
调用之后。更不用说在 subrtractor
之后和 adder
之前发送的消息处于错误阶段。
还有其他人可以确认这是一个错误吗?我已经针对 Kafka 版本 2.0.1 和 2.1.0 进行了测试。
编辑:
我在 github 中创建了一个测试用例来说明这个问题:https://github.com/mulho/topology-testcase
有两个输出记录(一个 "minus" 记录和一个 "plus" 记录)是预期的行为。理解它是如何工作的有点棘手,所以让我试着解释一下。
假设您有以下输入 table:
key | value
-----+---------
A | <10,2>
B | <10,3>
C | <11,4>
在 KTable#groupBy()
上,您将值的第一部分提取为新键(即 10
或 11
),然后对第二部分求和(即 2
, 3
, 4
) 在聚合中。因为 A
和 B
记录都将 10
作为新密钥,所以您将对 2+3
求和,并对 4
求和以获得新密钥 11
.结果 table 将是:
key | value
-----+---------
10 | 5
11 | 4
现在假设更新记录 <B,<11,5>>
将原始输入 KTable 更改为:
key | value
-----+---------
A | <10,2>
B | <11,5>
C | <11,4>
因此,新结果 table 应该总结 5+4
for 11
和 2
for 10
:
key | value
-----+---------
10 | 2
11 | 9
如果将第一个结果 table 与第二个结果进行比较,您可能会注意到 两个 行都得到了更新。从 10|5
中减去旧的 B|<10,3>
记录,得到 10|2
,将新的 B|<11,5>
记录添加到 11|4
,得到 11|9
。
这正是您看到的两条输出记录。第一个输出记录(在执行减法之后)更新第一行(它减去不再属于聚合结果的旧值),而第二个记录将新值添加到聚合结果中。在我们的示例中,减去记录将是 <10,<null,<10,3>>>
,添加记录将是 <11,<<11,5>,null>>
(这些记录的格式是 <key, <plus,minus>>
(注意减去记录只设置 minus
部分,而添加记录只设置 plus
部分)。
最后说明:不能将正负记录放在一起,因为正负记录的key可以不同(在我们的例子中是11
和10
),并且因此可能会进入不同的分区。这意味着加号和减号操作可能由不同的机器执行,因此不可能只发出一条包含加号和减号部分的记录。