不能在测试中多次使用 @KafkaListener
Can't use a @KafkaListener more than once in a Test
我们正在尝试测试一个 cloud-stream-kafka 应用程序,在测试中我们有多个测试方法发送消息,并且有一个 @KafkaListener 等待响应。
但是,第一个测试往往会通过,而第二个测试往往会失败。
任何指点将不胜感激。
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@EmbeddedKafka(topics = "input", partitions = 1)
@DirtiesContext
class EmbeddedKafkaListenerTest {
private CountDownLatch latch;
private String message;
@BeforeEach
void setUp() {
this.message = null;
this.latch = new CountDownLatch(1);
}
@Test
void testSendFirstMessage(@Autowired KafkaTemplate<String, byte[]> template)
throws InterruptedException {
template.send("input", "Hello World 1".getBytes());
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals("Hello World 1", message);
}
@Test
void testSendSecondMessage(@Autowired KafkaTemplate<String, byte[]> template)
throws InterruptedException {
template.send("input", "Hello World 2".getBytes());
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals("Hello World 2", message);
}
@KafkaListener(topics = "input", id = "kafka-listener-consumer")
void listener(Message<byte[]> message) {
this.message = new String(message.getPayload(), StandardCharsets.UTF_8);
this.latch.countDown();
}
}
似乎每个测试都注册了一个 @KafkaListener
实例,因为我们注意到使用 id
值会导致 java.lang.IllegalStateException: Another endpoint is already registered with id 'kafka-listener-consumer'
当使用的消息传递框架是 RabbitMQ 时,我使用 @RabbitListener
做过类似的测试。我希望我能做类似的事情,因为一些测试用例涉及等待没有消息被发布,我们可以用 assertFalse(latch.await(10, TimeUnit.SECONDS))
我认为你的 @KafkaListener
方法应该进入 @Configuration
class,否则 EmbeddedKafkaListenerTest
确实是每个测试方法实例化的,因此 @KafkaListener
有多少测试方法就被解析多少。
另一种方法是使用 @DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
,因此您不仅会为每个测试方法获得新鲜的 EmbeddedKafkaListenerTest
实例,而且还会 Spring ApplicationContext
.
还有一种使用 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
的方法,因此整个测试套件只有一个 EmbeddedKafkaListenerTest
,并且您的 @KafkaListener
不会被解析多次。
我们正在尝试测试一个 cloud-stream-kafka 应用程序,在测试中我们有多个测试方法发送消息,并且有一个 @KafkaListener 等待响应。
但是,第一个测试往往会通过,而第二个测试往往会失败。
任何指点将不胜感激。
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@EmbeddedKafka(topics = "input", partitions = 1)
@DirtiesContext
class EmbeddedKafkaListenerTest {
private CountDownLatch latch;
private String message;
@BeforeEach
void setUp() {
this.message = null;
this.latch = new CountDownLatch(1);
}
@Test
void testSendFirstMessage(@Autowired KafkaTemplate<String, byte[]> template)
throws InterruptedException {
template.send("input", "Hello World 1".getBytes());
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals("Hello World 1", message);
}
@Test
void testSendSecondMessage(@Autowired KafkaTemplate<String, byte[]> template)
throws InterruptedException {
template.send("input", "Hello World 2".getBytes());
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals("Hello World 2", message);
}
@KafkaListener(topics = "input", id = "kafka-listener-consumer")
void listener(Message<byte[]> message) {
this.message = new String(message.getPayload(), StandardCharsets.UTF_8);
this.latch.countDown();
}
}
似乎每个测试都注册了一个 @KafkaListener
实例,因为我们注意到使用 id
值会导致 java.lang.IllegalStateException: Another endpoint is already registered with id 'kafka-listener-consumer'
当使用的消息传递框架是 RabbitMQ 时,我使用 @RabbitListener
做过类似的测试。我希望我能做类似的事情,因为一些测试用例涉及等待没有消息被发布,我们可以用 assertFalse(latch.await(10, TimeUnit.SECONDS))
我认为你的 @KafkaListener
方法应该进入 @Configuration
class,否则 EmbeddedKafkaListenerTest
确实是每个测试方法实例化的,因此 @KafkaListener
有多少测试方法就被解析多少。
另一种方法是使用 @DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
,因此您不仅会为每个测试方法获得新鲜的 EmbeddedKafkaListenerTest
实例,而且还会 Spring ApplicationContext
.
还有一种使用 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
的方法,因此整个测试套件只有一个 EmbeddedKafkaListenerTest
,并且您的 @KafkaListener
不会被解析多次。