使用 Spring 嵌入式 Kafka 测试 @KafkaListener
Testing a @KafkaListener using Spring Embedded Kafka
我正在尝试为使用 Spring Boot 2.x 开发的 Kafka 侦听器编写单元测试。作为一个单元测试,我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的一个实例。所以,我决定使用 Spring Embedded Kafka。
我的监听器的定义非常基础。
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
另外,在收到消息后验证 latch
计数器是否为零的测试也非常简单。
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> buildKafkaTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
return new KafkaTemplate<>(pf);
}
@Test
public void listenerShouldConsumeMessages() throws InterruptedException {
// Given
producer.sendDefault(1, "Hello world");
// Then
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
}
}
不幸的是,测试失败了,我不明白为什么。是否可以使用 KafkaEmbedded
的实例来测试标有注释 @KafkaListener
的方法?
所有代码都在我的 GitHub 存储库 kafka-listener 中共享。
感谢大家。
您可能在为消费者分配 topic/partition 之前发送消息。设置 属性...
spring:
kafka:
consumer:
auto-offset-reset: earliest
...默认为 latest
。
这就像将 --from-beginning
与控制台使用者一起使用。
编辑
哦;你没有使用启动的属性。
添加
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
EDIT2
顺便说一句,您可能还应该对 template.send()
(Future<>
)的结果执行 get(10L, TimeUnit.SECONDS)
以断言发送成功。
EDIT3
要仅针对测试覆盖偏移量重置,您可以执行与代理地址相同的操作:
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);
和
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"})
但是,请记住,此 属性 仅适用于组的第一次消费。要在每次应用程序启动时始终从末尾开始,您必须在启动期间寻找到末尾。
此外,我建议将 enable.auto.commit
设置为 false
,以便容器负责提交偏移量,而不是仅仅依赖于消费者客户端按时执行。
也许有人会觉得这很有用。我有一个类似的问题。
本地测试是 运行(一些检查是在 Awaitility.waitAtMost
内执行的)但在 Jenkins 管道中,测试失败。
就像投票最多的答案中已经提到的那样,解决方案是设置 auto-offset-reset=earliest
。
当测试为 运行 时,您可以通过查看测试日志来检查您是否正确设置了配置。 Spring 生产者和消费者的输出配置
我正在尝试为使用 Spring Boot 2.x 开发的 Kafka 侦听器编写单元测试。作为一个单元测试,我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的一个实例。所以,我决定使用 Spring Embedded Kafka。
我的监听器的定义非常基础。
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
另外,在收到消息后验证 latch
计数器是否为零的测试也非常简单。
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> buildKafkaTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
return new KafkaTemplate<>(pf);
}
@Test
public void listenerShouldConsumeMessages() throws InterruptedException {
// Given
producer.sendDefault(1, "Hello world");
// Then
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
}
}
不幸的是,测试失败了,我不明白为什么。是否可以使用 KafkaEmbedded
的实例来测试标有注释 @KafkaListener
的方法?
所有代码都在我的 GitHub 存储库 kafka-listener 中共享。
感谢大家。
您可能在为消费者分配 topic/partition 之前发送消息。设置 属性...
spring:
kafka:
consumer:
auto-offset-reset: earliest
...默认为 latest
。
这就像将 --from-beginning
与控制台使用者一起使用。
编辑
哦;你没有使用启动的属性。
添加
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
EDIT2
顺便说一句,您可能还应该对 template.send()
(Future<>
)的结果执行 get(10L, TimeUnit.SECONDS)
以断言发送成功。
EDIT3
要仅针对测试覆盖偏移量重置,您可以执行与代理地址相同的操作:
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);
和
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"})
但是,请记住,此 属性 仅适用于组的第一次消费。要在每次应用程序启动时始终从末尾开始,您必须在启动期间寻找到末尾。
此外,我建议将 enable.auto.commit
设置为 false
,以便容器负责提交偏移量,而不是仅仅依赖于消费者客户端按时执行。
也许有人会觉得这很有用。我有一个类似的问题。
本地测试是 运行(一些检查是在 Awaitility.waitAtMost
内执行的)但在 Jenkins 管道中,测试失败。
就像投票最多的答案中已经提到的那样,解决方案是设置 auto-offset-reset=earliest
。
当测试为 运行 时,您可以通过查看测试日志来检查您是否正确设置了配置。 Spring 生产者和消费者的输出配置