Spring Kafka KafkaListener 在有断点时获取消息,没有断点

Spring Kafka KafkaListener picks up message when there's a breakpoint, doesn't without one

我有一个简单的集成测试,启动嵌入式 kafka,向主题发送消息,将有效负载分配给变量并比较值。

听众:

@KafkaListener( topics = "topic.name", groupId = "group-id")
public void receive(@Payload ConsumerRecord<?, ?> consumerRecord) {
    setPayload(consumerRecord.toString());
}

测试(使用等待延迟)

@Test
public void canReceiveAMessage() {
    producer.send(topic, TEST_MESSAGE);
    await().pollDelay(10, TimeUnit.SECONDS).atMost(30, TimeUnit.SECONDS).untilAsserted(() 
        -> assertThat(consumer.getPayload()).contains(TEST_MESSAGE);
}

我假设断点解决了某种竞争条件——但是,有了断点,它的解决速度超过 30 秒,所以我不确定它似乎触发了什么让它起作用。我可以验证它的其余部分是否正常,因为它适用于断点。

编辑:如果我将消费者配置发送到 AUTO_OFFSET_RESET_CONFIG="earliest",它就会可靠地通过。现在,我不想要这个长期的,它想要“最新的” - 有可靠的解决方案吗?

断点导致生产者线程刷新其批处理。

调用producer.send(topic, TEST_MESSAGE).get(Duration.ofMillis(100);以便正确等待

您可以在容器配置中添加一个ConsumerRebalanceListener,等待分区分配完成后再发送测试消息。