在 Spring Cloud Stream 中使用嵌入式 Kafka 进行集成测试时,如何立即验证消息是否已被确认?
How can you verify immediately that a message was acknowledged when integration testing using Embedded Kafka in Spring Cloud Stream?
我们使用 Spring Cloud Stream Kafka Binder(与 Project Reactor 集成,即
Flux
streams)
manual offset commits
(即 autoCommitOffset = false
)。
我们正在尝试使用 Embedded Kafka from spring-kafka-test 编写集成测试
这应该断言这一切都有效,通过手动读取消费者组偏移量,使用 admin client,
测试前后发消息给我们的话题。
测试间歇性失败。使用 awaitility 我们现在最多等待 10 秒来轮询偏移量,
这似乎解决了我们的大部分问题,因为偏移量将在大约 7 秒后发生变化 - 但这对于测试来说并不令人满意。
有没有办法确保 Spring 一旦我们通过调用 Acknowledgement.acknowledge()
手动确认消息接收,Cloud Stream Kafka Binder 将立即写入偏移量更改?
换句话说:我们如何验证 acknowledge
在我们的测试中被调用而无需等待?
我们使用 Kotlin、Mockito 和 Mockito-kotlin,因此不能使用 PowerMockito。
问题是 Consumer
不是线程安全的。提交必须在容器线程上完成。如果当您确认时消费者正坐在 poll()
中,则您必须等待最多 pollTimeout
才能提交偏移量。
默认 pollTimeout
为 5 秒。
可以加一个ListenerContainerCustomizer
@Bean
来修改ContainerProperties
.
我们使用 Spring Cloud Stream Kafka Binder(与 Project Reactor 集成,即
Flux
streams)
manual offset commits
(即 autoCommitOffset = false
)。
我们正在尝试使用 Embedded Kafka from spring-kafka-test 编写集成测试 这应该断言这一切都有效,通过手动读取消费者组偏移量,使用 admin client, 测试前后发消息给我们的话题。
测试间歇性失败。使用 awaitility 我们现在最多等待 10 秒来轮询偏移量, 这似乎解决了我们的大部分问题,因为偏移量将在大约 7 秒后发生变化 - 但这对于测试来说并不令人满意。
有没有办法确保 Spring 一旦我们通过调用 Acknowledgement.acknowledge()
手动确认消息接收,Cloud Stream Kafka Binder 将立即写入偏移量更改?
换句话说:我们如何验证 acknowledge
在我们的测试中被调用而无需等待?
我们使用 Kotlin、Mockito 和 Mockito-kotlin,因此不能使用 PowerMockito。
问题是 Consumer
不是线程安全的。提交必须在容器线程上完成。如果当您确认时消费者正坐在 poll()
中,则您必须等待最多 pollTimeout
才能提交偏移量。
默认 pollTimeout
为 5 秒。
可以加一个ListenerContainerCustomizer
@Bean
来修改ContainerProperties
.