TopologyTestDriver 在 KTable 聚合上发送不正确的消息

TopologyTestDriver sending incorrect message on KTable aggregations

我有一个聚合在 KTable 上的拓扑。 这是我创建的一种通用方法,用于根据我拥有的不同主题构建此拓扑。

public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
        Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
    return table
            .groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
                    Serialized.with(keySerde, valueSerde))
            .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
                agg.remove(newValue);
                agg.add(newValue);
                return agg;
            }, (key, oldValue, agg) -> {
                agg.remove(oldValue);
                return agg;
            }, Materialized.with(keySerde, aggregatedSerde));
}

这在使用 Kafka 时效果很好,但在通过“TopologyTestDriver”进行测试时效果不佳。

在这两种情况下,当我获得更新时,首先调用 subtractor,然后调用 adder。问题是当使用 TopologyTestDriver 时,会发送两条消息进行更新:一条在 subtractor 调用之后,另一条在 adder 调用之后。更不用说在 subrtractor 之后和 adder 之前发送的消息处于错误阶段。

还有其他人可以确认这是一个错误吗?我已经针对 Kafka 版本 2.0.1 和 2.1.0 进行了测试。

编辑:
我在 github 中创建了一个测试用例来说明这个问题:https://github.com/mulho/topology-testcase

有两个输出记录(一个 "minus" 记录和一个 "plus" 记录)是预期的行为。理解它是如何工作的有点棘手,所以让我试着解释一下。

假设您有以下输入 table:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <10,3>
  C  |  <11,4>

KTable#groupBy() 上,您将值的第一部分提取为新键(即 1011),然后对第二部分求和(即 2, 3, 4) 在聚合中。因为 AB 记录都将 10 作为新密钥,所以您将对 2+3 求和,并对 4 求和以获得新密钥 11 .结果 table 将是:

 key |  value
-----+---------
  10 |  5
  11 |  4

现在假设更新记录 <B,<11,5>> 将原始输入 KTable 更改为:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <11,5>
  C  |  <11,4>

因此,新结果 table 应该总结 5+4 for 112 for 10:

 key |  value
-----+---------
  10 |  2
  11 |  9

如果将第一个结果 table 与第二个结果进行比较,您可能会注意到 两个 行都得到了更新。从 10|5 中减去旧的 B|<10,3> 记录,得到 10|2,将新的 B|<11,5> 记录添加到 11|4,得到 11|9

这正是您看到的两条输出记录。第一个输出记录(在执行减法之后)更新第一行(它减去不再属于聚合结果的旧值),而第二个记录将新值添加到聚合结果中。在我们的示例中,减去记录将是 <10,<null,<10,3>>>,添加记录将是 <11,<<11,5>,null>>(这些记录的格式是 <key, <plus,minus>>(注意减去记录只设置 minus 部分,而添加记录只设置 plus 部分)。

最后说明:不能将正负记录放在一起,因为正负记录的key可以不同(在我们的例子中是1110),并且因此可能会进入不同的分区。这意味着加号和减号操作可能由不同的机器执行,因此不可能只发出一条包含加号和减号部分的记录。