Kafka GroupTable 测试在使用 ProcessorTopologyTestDriver 时生成额外的消息

Kafka GroupTable tests generating extra messages when using ProcessorTopologyTestDriver

我编写了一个流,它接收消息并发送 table 已出现的键。如果出现某些内容,它将显示计数 1。这是我的生产代码的简化版本,用于演示该错误。在实时 运行 中,每收到一条消息就会发送一条消息。

但是,当我 运行 在单元测试中使用 ProcessorTopologyTestDriver 时,我得到了不同的行为。如果收到之前已经看到的密钥,我会收到一条额外的消息。

如果我用键 "key1" 发送消息,然后 "key2",然后 "key1",我得到以下输出。

key1 - 1
key2 - 1
key1 - 0
key1 - 1

出于某种原因,它会在重新添加之前递减该值。这仅在使用 ProcessorTopologyTestDriver 时发生。这是预期的吗?有解决办法吗?或者这是一个错误?

这是我的拓扑结构:

final StreamsBuilder builder = new StreamsBuilder();
    KGroupedTable<String, String> groupedTable
            = builder.table(applicationConfig.sourceTopic(), Consumed.with(Serdes.String(), Serdes.String()))
            .groupBy((key, value) -> KeyValue.pair(key, value), Serialized.with(Serdes.String(), Serdes.String()));

    KTable<String, Long> countTable = groupedTable.count();

    KStream<String, Long> countTableAsStream = countTable.toStream();
    countTableAsStream.to(applicationConfig.outputTopic(), Produced.with(Serdes.String(), Serdes.Long()));

这是我的单元测试代码:

TopologyWithGroupedTable top = new TopologyWithGroupedTable(appConfig, map);
    Topology topology = top.get();
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, topology);
    driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());
    driver.process(inputTopic, "key2", "theval", Serdes.String().serializer(), Serdes.String().serializer());
    driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());

    ProducerRecord<String, Long> outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key1", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value());
    outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key2", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value());
    outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key1", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value()); //this fails, I get 0.  If I pull another message, it shows key1 with a count of 1

这是完整代码的回购:

https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/

流拓扑:https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/main/java/com/nick/kstreams/TopologyWithGroupedTable.java

测试代码:https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/test/java/com/nick/kstreams/TopologyWithGroupedTableTests.java

这不是错误,而是设计行为(c.f。下面有解释)。

行为差异是由于 KTable 状态存储缓存(参见 https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)。当您 运行 单元测试时,缓存在每条记录后刷新,而在您的生产 运行 中,情况并非如此。如果您在生产中禁用缓存 运行,我假设它的行为与您的单元测试相同。

Side remark: ProcessorTopologyTestDriver is an internal class and not part of public API. Thus, there is no compatibility guarantee. You should use the official unit-test packages instead: https://docs.confluent.io/current/streams/developer-guide/test-streams.html

为什么看到两条记录:

在您的代码中,您使用的是 KTable#groupBy(),并且在您的特定用例中,您没有更改密钥。但是,一般情况下,key可能会改变(取决于输入KTable的值)。因此,如果输入KTable改变,下游聚合需要remove/subtract旧的key -value pair从聚合结果中取出,并将新的key-value pair添加到聚合结果中——一般情况下,新旧pair的key是不同的,因此需要生成两条记录,因为可能会发生减法和加法在不同的实例上,因为不同的密钥可能会以不同的方式散列。这有意义吗?

因此,对于输入 KTable 的每次更新,通常需要计算两个不同键值对的两次更新两次结果 KTable。对于密钥不变的特定情况,Kafka Stream 会做同样的事情(如果密钥实际上相同,则在这种情况下没有 check/optimization 将两个操作 "merge" 合二为一)。