如何使用 Junit 测试 Kafka OnFailure 回调?

How to test Kafka OnFailure callback with Junit?

我有以下代码向 Kafka 发送数据:

@Service
public class KafkaSender{
    @Autowired
    private KafkaTemplate<String, Employee> kafkaTemplate;
    public void sendMessage(Employee employee) {

        ObjectMapper objectMapper = new ObjectMapper();
        ListenableFuture<SendResult<String, Employee>> listenableFuture = kafkaTemplate.send(topic,employee);

        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {
            @Override
            public void onSuccess(SendResult<String, Employee> result) {
                // method to save in DB
                saveInDatabaseMethod(result.getProducerRecord());
            }

            @Override
            public void onFailure(Throwable ex) {
                // class cast exception occur here
                ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();
                saveInDatabaseMethod(producerRecord);
            }
        }
    }
}

我可以测试 OnSucess 回调方案,但无法测试 OnFailure 回调方案。

@Test
void test() throws InterruptedException, ExecutionException {

    Throwable ex = mock(Throwable.class);
    Employee employee = new Employee();

    when(kafkaTemplate.send(null,employee )).thenReturn(responseFuture);
    when(sendResult.getProducerRecord()).thenReturn(producerRecord);
    when(producerRecord.value()).thenReturn(employee);

    doAnswer(invocationOnMock -> {
        ListenableFutureCallback<SendResult<String, Employee>> listenableFutureCallback = invocationOnMock.getArgument(0);
        listenableFutureCallback.onFailure(ex);
        return null;
    }).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
    
    kafkaSender.sendMessage(employee);
}

以上测试抛出:

java.lang.ClassCastException: org.mockito.codegen.Throwable$MockitoMock37573915 cannot be cast to org.springframework.kafka.core.KafkaProducerException

ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();

你的模拟不是用 KPE 调用回调,而是用这个调用它

Throwable ex = mock(Throwable.class);

您需要将其包装在 KPE 中。