在 Spring Cloud Stream 中使用嵌入式 Kafka 进行集成测试时,如何立即验证消息是否已被确认?

How can you verify immediately that a message was acknowledged when integration testing using Embedded Kafka in Spring Cloud Stream?

我们使用 Spring Cloud Stream Kafka Binder(与 Project Reactor 集成,即 Flux streams) manual offset commits (即 autoCommitOffset = false)。

我们正在尝试使用 Embedded Kafka from spring-kafka-test 编写集成测试 这应该断言这一切都有效,通过手动读取消费者组偏移量,使用 admin client, 测试前后发消息给我们的话题。

测试间歇性失败。使用 awaitility 我们现在最多等待 10 秒来轮询偏移量, 这似乎解决了我们的大部分问题,因为偏移量将在大约 7 秒后发生变化 - 但这对于测试来说并不令人满意。

有没有办法确保 Spring 一旦我们通过调用 Acknowledgement.acknowledge() 手动确认消息接收,Cloud Stream Kafka Binder 将立即写入偏移量更改?

换句话说:我们如何验证 acknowledge 在我们的测试中被调用而无需等待?

我们使用 Kotlin、Mockito 和 Mockito-kotlin,因此不能使用 PowerMockito。

问题是 Consumer 不是线程安全的。提交必须在容器线程上完成。如果当您确认时消费者正坐在 poll() 中,则您必须等待最多 pollTimeout 才能提交偏移量。

默认 pollTimeout 为 5 秒。

可以加一个ListenerContainerCustomizer@Bean来修改ContainerProperties.