测试 KafkaStreams 应用程序
Testing KafkaStreams applications
我已经设置了一个简单的聚合,将来自多个流的值放在一起,我正在尝试对其进行测试。我已经花了很多时间,但似乎无法直接理解这些概念。我的直播如下:
// Combine multiple streams together.
KStream<String, IndividualTick> tickerStream =
priceIndexStreamBuilder.stream(exchangeTopics, Consumed.with(...));
// Group by a key & compute average per key
KStream<K, AveragedTick> avgTickerStream = tickStream.selectKey((key,
value) -> value.getK())
.groupByKey(...)
.aggregate(AvgTick::new,
(key, value, aggregate) -> {
aggregate.addTick(value);
return aggregate;
},
Materialized.with(...))
.toStream();
indexTickerStream.to(sinkTopic, Produced.with(...));
我的测试使用 EmbeddedKafka,将一堆记录发布到主题,并坐在阻塞队列中等待记录到达 sinkTopic
。
我对这种聚合如何随时间变化很感兴趣,所以我希望断言每个输出代码的平均值。我可能会添加一定程度的窗口,但我现在尽量保持简单。
当我 运行 我的测试时,我得到了不同的结果。假设我的拓扑中有 10 个输入记录:
- 我的聚合器被调用了 10 次
- 我在
AverageTick
序列化程序中放置的断点被调用的次数不同。
- 我在测试中声明了记录的值。
我认为这是因为 KIP-63 中定义的缓存功能 - 记录很快出现在处理节点上,并且 coalesced/overwritten 具有最新记录。 (虽然我不完全确定。)
我的单元测试通过 ProcessorTopologyTestDriver
,但我正在尝试为包含此逻辑的服务编写一些验收测试。
我也试过使用我的 commit.interval.ms
配置,以及在发布我的输入记录之间休眠,以取得不同程度的(不稳定的)成功。
- 这些测试有意义吗?
- 如何针对真实的 Kafka 实例断言此微服务的正确性?
我觉得我在这里做的事情在概念上是错误的 - 我只是不知道可以采取什么其他方法。
您的观察是正确的。缓存使测试变得困难,因为它引入了不确定性。
要编写有用的测试,您有两个选择:
- 通过将缓存大小设置为零来禁用缓存(这样,所有输出记录,包括所有中间记录都是确定性的)
- 只检查每个键的最后一个结果记录(最后一个结果必须始终相同,与固定输入数据的缓存无关)
顺便说一句:在即将到来的 1.1 中,Kafka 添加了一个 public 测试包,我们计划添加更多:https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams
我已经设置了一个简单的聚合,将来自多个流的值放在一起,我正在尝试对其进行测试。我已经花了很多时间,但似乎无法直接理解这些概念。我的直播如下:
// Combine multiple streams together.
KStream<String, IndividualTick> tickerStream =
priceIndexStreamBuilder.stream(exchangeTopics, Consumed.with(...));
// Group by a key & compute average per key
KStream<K, AveragedTick> avgTickerStream = tickStream.selectKey((key,
value) -> value.getK())
.groupByKey(...)
.aggregate(AvgTick::new,
(key, value, aggregate) -> {
aggregate.addTick(value);
return aggregate;
},
Materialized.with(...))
.toStream();
indexTickerStream.to(sinkTopic, Produced.with(...));
我的测试使用 EmbeddedKafka,将一堆记录发布到主题,并坐在阻塞队列中等待记录到达 sinkTopic
。
我对这种聚合如何随时间变化很感兴趣,所以我希望断言每个输出代码的平均值。我可能会添加一定程度的窗口,但我现在尽量保持简单。
当我 运行 我的测试时,我得到了不同的结果。假设我的拓扑中有 10 个输入记录:
- 我的聚合器被调用了 10 次
- 我在
AverageTick
序列化程序中放置的断点被调用的次数不同。 - 我在测试中声明了记录的值。
我认为这是因为 KIP-63 中定义的缓存功能 - 记录很快出现在处理节点上,并且 coalesced/overwritten 具有最新记录。 (虽然我不完全确定。)
我的单元测试通过 ProcessorTopologyTestDriver
,但我正在尝试为包含此逻辑的服务编写一些验收测试。
我也试过使用我的 commit.interval.ms
配置,以及在发布我的输入记录之间休眠,以取得不同程度的(不稳定的)成功。
- 这些测试有意义吗?
- 如何针对真实的 Kafka 实例断言此微服务的正确性?
我觉得我在这里做的事情在概念上是错误的 - 我只是不知道可以采取什么其他方法。
您的观察是正确的。缓存使测试变得困难,因为它引入了不确定性。
要编写有用的测试,您有两个选择:
- 通过将缓存大小设置为零来禁用缓存(这样,所有输出记录,包括所有中间记录都是确定性的)
- 只检查每个键的最后一个结果记录(最后一个结果必须始终相同,与固定输入数据的缓存无关)
顺便说一句:在即将到来的 1.1 中,Kafka 添加了一个 public 测试包,我们计划添加更多:https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams