如何使用 Junit 测试 Kafka OnFailure 回调?
How to test Kafka OnFailure callback with Junit?
我有以下代码向 Kafka 发送数据:
@Service
public class KafkaSender{
@Autowired
private KafkaTemplate<String, Employee> kafkaTemplate;
public void sendMessage(Employee employee) {
ObjectMapper objectMapper = new ObjectMapper();
ListenableFuture<SendResult<String, Employee>> listenableFuture = kafkaTemplate.send(topic,employee);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {
@Override
public void onSuccess(SendResult<String, Employee> result) {
// method to save in DB
saveInDatabaseMethod(result.getProducerRecord());
}
@Override
public void onFailure(Throwable ex) {
// class cast exception occur here
ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();
saveInDatabaseMethod(producerRecord);
}
}
}
}
我可以测试 OnSucess
回调方案,但无法测试 OnFailure
回调方案。
@Test
void test() throws InterruptedException, ExecutionException {
Throwable ex = mock(Throwable.class);
Employee employee = new Employee();
when(kafkaTemplate.send(null,employee )).thenReturn(responseFuture);
when(sendResult.getProducerRecord()).thenReturn(producerRecord);
when(producerRecord.value()).thenReturn(employee);
doAnswer(invocationOnMock -> {
ListenableFutureCallback<SendResult<String, Employee>> listenableFutureCallback = invocationOnMock.getArgument(0);
listenableFutureCallback.onFailure(ex);
return null;
}).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
kafkaSender.sendMessage(employee);
}
以上测试抛出:
java.lang.ClassCastException:
org.mockito.codegen.Throwable$MockitoMock37573915 cannot be cast to
org.springframework.kafka.core.KafkaProducerException
ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();
你的模拟不是用 KPE 调用回调,而是用这个调用它
Throwable ex = mock(Throwable.class)
;
您需要将其包装在 KPE 中。
我有以下代码向 Kafka 发送数据:
@Service
public class KafkaSender{
@Autowired
private KafkaTemplate<String, Employee> kafkaTemplate;
public void sendMessage(Employee employee) {
ObjectMapper objectMapper = new ObjectMapper();
ListenableFuture<SendResult<String, Employee>> listenableFuture = kafkaTemplate.send(topic,employee);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {
@Override
public void onSuccess(SendResult<String, Employee> result) {
// method to save in DB
saveInDatabaseMethod(result.getProducerRecord());
}
@Override
public void onFailure(Throwable ex) {
// class cast exception occur here
ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();
saveInDatabaseMethod(producerRecord);
}
}
}
}
我可以测试 OnSucess
回调方案,但无法测试 OnFailure
回调方案。
@Test
void test() throws InterruptedException, ExecutionException {
Throwable ex = mock(Throwable.class);
Employee employee = new Employee();
when(kafkaTemplate.send(null,employee )).thenReturn(responseFuture);
when(sendResult.getProducerRecord()).thenReturn(producerRecord);
when(producerRecord.value()).thenReturn(employee);
doAnswer(invocationOnMock -> {
ListenableFutureCallback<SendResult<String, Employee>> listenableFutureCallback = invocationOnMock.getArgument(0);
listenableFutureCallback.onFailure(ex);
return null;
}).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
kafkaSender.sendMessage(employee);
}
以上测试抛出:
java.lang.ClassCastException: org.mockito.codegen.Throwable$MockitoMock37573915 cannot be cast to org.springframework.kafka.core.KafkaProducerException
ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();
你的模拟不是用 KPE 调用回调,而是用这个调用它
Throwable ex = mock(Throwable.class)
;
您需要将其包装在 KPE 中。