Mockito 单元测试存根方法抛出 NullPointerException

Mockito Unit Tests stubbing a method throws NullPointerException

我正在为下面class

中的run()方法编写一些单元测试
public class TopicProcessor implements Runnable {

     private final KafkaConfig config;
     private final String inTopic;
     private final String outTopic;
     private final String inBroker;
     private final String outBroker;
     private KafkaConsumer<Long, byte[]> kafkaConsumer;
     private KafkaProducer<Long, byte[]> kafkaProducer;


     public void setKafkaConsumer(KafkaConsumer<Long, byte[]> kafkaConsumer) {
         this.kafkaConsumer = kafkaConsumer;
     }

     public void setKafkaProducer(KafkaProducer<Long, byte[]> kafkaProducer) {
         this.kafkaProducer = kafkaProducer;
     }

     public TopicProcessor(String topicKey, KafkaConfig config) {
         this.config = config;
         Map<String, Map<String, String>> mapping = this.config.getMapping(topicKey);
         this.inTopic = mapping.get("in").get("topic");
         this.outTopic = mapping.get("out").get("topic");
         this.inBroker = mapping.get("in").get("broker");
         this.outBroker = mapping.get("out").get("broker");
     }


     @Override
     public void run() {
         long now;
         getKafkaConsumer().subscribe(Collections.singletonList(this.inTopic));
         while (true) {
             ConsumerRecords<Long, byte[]> records = getKafkaConsumer().poll(Duration.ofMillis(100));
             meterInfo("consumer.received." + this.inTopic, records.count());
             for (ConsumerRecord<Long, byte[]> record : records) {
                 now = currentTimeMillis();
                 histogram("consumer.latency." + this.inTopic, now - record.timestamp());
                 getKafkaProducer().send(new ProducerRecord<>(this.outTopic, record.key(), record.value()));
                 meterInfo("publisher.sent." + this.outTopic);
             }
         }
     }

基本上,在 run() 方法中,我订阅了一个主题,使用 KafkaConsumer.poll() 方法轮询所有记录,对于每条记录,我使用 KafkaProducer.send() 方法发送记录.

我必须为以下场景编写单元测试:-

  1. 测试 KafkaConsumer.poll() 方法调用一次
  2. 测试 KafkaProducer.send() 方法调用一次

下面是我的测试方法:-

@Test
public void test(){
    //Mocks
    KafkaConsumer<Long, byte[]> consumerMock = mock(KafkaConsumer.class);
    KafkaProducer<Long, byte[]> producerMock = mock(KafkaProducer.class);

    //test data
    Map<TopicPartition, List<ConsumerRecord<Long, byte[]>>> records = new LinkedHashMap<>();
records.put(new TopicPartition("topic", 0), new ArrayList<>());
    ConsumerRecords<Long, byte[]> consumerRecords = new ConsumerRecords<>(records);

    ProducerRecord<Long, byte[]> producer_record = new ProducerRecord<Long, byte[]>("topic", 1, 0L, 1L, new byte[]{'a'});

    //object creation to call the method
    KafkaConfig kafkaConfig = new KafkaConfig(yamlConfig);
    TopicProcessor topicProcessor = new TopicProcessor("pacing_record", kafkaConfig);

    //stubbing Kafka methods - poll and send
    when(consumerMock.poll(Duration.ofMillis(anyInt()))).thenReturn(consumerRecords);  
    doReturn("sent").when(producerMock.send(producer_record, null));

    //Injecting mocks thorugh setter method
    topicProcessor.setKafkaConsumer(consumerMock);
    topicProcessor.setKafkaProducer(producerMock);

    //calling a class method run() from above class to verify that kafka methods are called
    topicProcessor.run();

    verify(consumerMock, times(1)).poll(Duration.ofMillis(anyInt()));

}

上述测试方法中的下面一行抛出 NullPointerException

    when(kafkaConsumerMock.poll(Duration.ofMillis(anyInt()))).thenReturn(consumerRecords);

有人知道如何正确地存根该方法并验证它是否被调用吗?

when(kafkaConsumerMock.poll(Duration.ofMillis(anyInt())))

这是 Mockito 的错误用法。 any* 方法只能用作模拟方法的直接参数。

我想你在这里想表达的是"for any duration, I don't care how long it is"。如果是这样,您可以将其表示为:

when(kafkaConsumerMock.poll(any(Duration.class))

when(kafkaConsumerMock.poll(any())

如果没有模棱两可的重载。

同理,验证:

verify(kafkaConsumerMock, times(1)).poll(any(Duration.class))