如何验证sprng kafka producer是否成功发送消息?
How to verify sprng kafka producer has successfully sent message or not?
我正在尝试测试 spring kafka(2.2.5.RELEASE),当生产者使用 kafkatemplate 发送消息时,我想知道该消息是否发送成功。基于此,我想更新该消息 ID 的数据库记录。处理这种情况的最佳做法是什么?
这是检查成功或失败的示例代码
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test_topic", key, userMsg);
SendResult<String, String> result = null;
try {
result = future.get(10000, TimeUnit.MILLISECONDS);
LOGGER.info("Send Result : {}", result);
LOGGER.info("Saving entry in db");
messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
} catch (Exception e) {
LOGGER.error("Error publishing message ", e);
messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
}
send()
操作return一个ListenableFuture<>
是异步完成的。
如果要阻塞调用线程获取结果,使用
future.get(timeout, TimeUnit.MILLISECONDS);
我正在尝试测试 spring kafka(2.2.5.RELEASE),当生产者使用 kafkatemplate 发送消息时,我想知道该消息是否发送成功。基于此,我想更新该消息 ID 的数据库记录。处理这种情况的最佳做法是什么?
这是检查成功或失败的示例代码
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test_topic", key, userMsg);
SendResult<String, String> result = null;
try {
result = future.get(10000, TimeUnit.MILLISECONDS);
LOGGER.info("Send Result : {}", result);
LOGGER.info("Saving entry in db");
messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
} catch (Exception e) {
LOGGER.error("Error publishing message ", e);
messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
}
send()
操作return一个ListenableFuture<>
是异步完成的。
如果要阻塞调用线程获取结果,使用
future.get(timeout, TimeUnit.MILLISECONDS);