Spring Kafka KafkaListener 在有断点时获取消息,没有断点
Spring Kafka KafkaListener picks up message when there's a breakpoint, doesn't without one
我有一个简单的集成测试,启动嵌入式 kafka,向主题发送消息,将有效负载分配给变量并比较值。
听众:
@KafkaListener( topics = "topic.name", groupId = "group-id")
public void receive(@Payload ConsumerRecord<?, ?> consumerRecord) {
setPayload(consumerRecord.toString());
}
测试(使用等待延迟)
@Test
public void canReceiveAMessage() {
producer.send(topic, TEST_MESSAGE);
await().pollDelay(10, TimeUnit.SECONDS).atMost(30, TimeUnit.SECONDS).untilAsserted(()
-> assertThat(consumer.getPayload()).contains(TEST_MESSAGE);
}
我假设断点解决了某种竞争条件——但是,有了断点,它的解决速度超过 30 秒,所以我不确定它似乎触发了什么让它起作用。我可以验证它的其余部分是否正常,因为它适用于断点。
编辑:如果我将消费者配置发送到 AUTO_OFFSET_RESET_CONFIG="earliest",它就会可靠地通过。现在,我不想要这个长期的,它想要“最新的” - 有可靠的解决方案吗?
断点导致生产者线程刷新其批处理。
调用producer.send(topic, TEST_MESSAGE).get(Duration.ofMillis(100);
以便正确等待
您可以在容器配置中添加一个ConsumerRebalanceListener
,等待分区分配完成后再发送测试消息。
我有一个简单的集成测试,启动嵌入式 kafka,向主题发送消息,将有效负载分配给变量并比较值。
听众:
@KafkaListener( topics = "topic.name", groupId = "group-id")
public void receive(@Payload ConsumerRecord<?, ?> consumerRecord) {
setPayload(consumerRecord.toString());
}
测试(使用等待延迟)
@Test
public void canReceiveAMessage() {
producer.send(topic, TEST_MESSAGE);
await().pollDelay(10, TimeUnit.SECONDS).atMost(30, TimeUnit.SECONDS).untilAsserted(()
-> assertThat(consumer.getPayload()).contains(TEST_MESSAGE);
}
我假设断点解决了某种竞争条件——但是,有了断点,它的解决速度超过 30 秒,所以我不确定它似乎触发了什么让它起作用。我可以验证它的其余部分是否正常,因为它适用于断点。
编辑:如果我将消费者配置发送到 AUTO_OFFSET_RESET_CONFIG="earliest",它就会可靠地通过。现在,我不想要这个长期的,它想要“最新的” - 有可靠的解决方案吗?
断点导致生产者线程刷新其批处理。
调用producer.send(topic, TEST_MESSAGE).get(Duration.ofMillis(100);
以便正确等待
您可以在容器配置中添加一个ConsumerRebalanceListener
,等待分区分配完成后再发送测试消息。