单元测试 MessageListener class
Unit testing MessageListener class
如何对实现 spring-kafka MessageListener 接口的 class 进行单元测试?我有一个侦听器 class,我正在使用 onMessage 函数手动收听主题。这个函数非常简单,只是接收消息。
我的设置是 Spring 5.8、Spring-Kafka 2.2.7、Spring-Kafka-Test、JUnit 和 WITHOUT spring开机。
我一直在尝试 Spring 参考文档和其他帖子中的许多不同示例,但 none 似乎展示了一种简单的方法来测试实现 MessageListener 的监听器 class .
我不确定是否需要设置 EmbeddedKafkaBroker 或 EmbeddedKafkaRule,或者是否有不同的测试方法。当我尝试使用 EmbeddedKafkaRule 时,我收到一条错误消息,显示 NoClassDefFound。
但是我不明白这个测试用例如何影响我的 onMessage 函数。
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class listenerTest {
private String topic = "someTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, topic);
private CountDownLatch countDownLatch;
@Before
public void setUpTests (){
Map<String, Object> sProps = KafkaTestUtils.senderProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString());
ProducerFactory producer = new DefaultKafkaProducerFactory<String, String> (sProps);
kafkaTemplate = new KafkaTemplate<>(producer);
kafkaTemplate.setDefaultTopic(topic);
countDownLatch = new CountDownLatch (1);
}
@Test
public void testReceiver(){
kafkaTemplate.sendDefault("message");
assertEquals(latch.getCount(), 0);
}
Class 我要单元测试
public class listener implements BatchAcknowledgingMessageListener<String, String>{
private CallbackInterface callback;
public listener(CallbackInterface callback){
this.callbackI = callback;
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
this.callbackI.handleMessage();
ack.acknowledge();
}
}
这会抛出一个奇怪的错误,说明... NoClassDefFound
对于纯单元测试,不需要嵌入式代理,直接调用监听即可。
注入一个模拟回调并验证它是否被正确调用。
When I tried to directly call the onMessage function to test it I get an error that says Container should not be calling function onMessage.
你打错了onMessage
...
public interface BatchMessageListener extends MessageListener {
@Override
default void onMessage(Message message) {
throw new UnsupportedOperationException("Should never be called by the container");
}
@Override
void onMessageBatch(List<Message> messages);
}
编辑
public class MyListener implements BatchAcknowledgingMessageListener<String, String> {
private final MyService service;
public MyListener(MyService service) {
this.service = service;
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment) {
data.forEach(dat -> this.service.call(dat.value()));
acknowledgment.acknowledge();
}
public interface MyService {
void call(String toCall);
}
}
和
class So57192362ApplicationTests {
@Test
void test() {
MyService service = mock(MyService.class);
MyListener listener = new MyListener(service);
Acknowledgment acknowledgment = mock(Acknowledgment.class);
listener.onMessage(Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, null, "bar")), acknowledgment);
verify(service).call("bar");
verify(acknowledgment).acknowledge();
verifyNoMoreInteractions(service, acknowledgment);
}
}
如何对实现 spring-kafka MessageListener 接口的 class 进行单元测试?我有一个侦听器 class,我正在使用 onMessage 函数手动收听主题。这个函数非常简单,只是接收消息。
我的设置是 Spring 5.8、Spring-Kafka 2.2.7、Spring-Kafka-Test、JUnit 和 WITHOUT spring开机。
我一直在尝试 Spring 参考文档和其他帖子中的许多不同示例,但 none 似乎展示了一种简单的方法来测试实现 MessageListener 的监听器 class .
我不确定是否需要设置 EmbeddedKafkaBroker 或 EmbeddedKafkaRule,或者是否有不同的测试方法。当我尝试使用 EmbeddedKafkaRule 时,我收到一条错误消息,显示 NoClassDefFound。
但是我不明白这个测试用例如何影响我的 onMessage 函数。
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class listenerTest {
private String topic = "someTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, topic);
private CountDownLatch countDownLatch;
@Before
public void setUpTests (){
Map<String, Object> sProps = KafkaTestUtils.senderProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString());
ProducerFactory producer = new DefaultKafkaProducerFactory<String, String> (sProps);
kafkaTemplate = new KafkaTemplate<>(producer);
kafkaTemplate.setDefaultTopic(topic);
countDownLatch = new CountDownLatch (1);
}
@Test
public void testReceiver(){
kafkaTemplate.sendDefault("message");
assertEquals(latch.getCount(), 0);
}
Class 我要单元测试
public class listener implements BatchAcknowledgingMessageListener<String, String>{
private CallbackInterface callback;
public listener(CallbackInterface callback){
this.callbackI = callback;
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
this.callbackI.handleMessage();
ack.acknowledge();
}
}
这会抛出一个奇怪的错误,说明... NoClassDefFound
对于纯单元测试,不需要嵌入式代理,直接调用监听即可。
注入一个模拟回调并验证它是否被正确调用。
When I tried to directly call the onMessage function to test it I get an error that says Container should not be calling function onMessage.
你打错了onMessage
...
public interface BatchMessageListener extends MessageListener {
@Override
default void onMessage(Message message) {
throw new UnsupportedOperationException("Should never be called by the container");
}
@Override
void onMessageBatch(List<Message> messages);
}
编辑
public class MyListener implements BatchAcknowledgingMessageListener<String, String> {
private final MyService service;
public MyListener(MyService service) {
this.service = service;
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment) {
data.forEach(dat -> this.service.call(dat.value()));
acknowledgment.acknowledge();
}
public interface MyService {
void call(String toCall);
}
}
和
class So57192362ApplicationTests {
@Test
void test() {
MyService service = mock(MyService.class);
MyListener listener = new MyListener(service);
Acknowledgment acknowledgment = mock(Acknowledgment.class);
listener.onMessage(Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, null, "bar")), acknowledgment);
verify(service).call("bar");
verify(acknowledgment).acknowledge();
verifyNoMoreInteractions(service, acknowledgment);
}
}