处理 Kafka 超时异常 Java
Handling Kafka timeout exception Java
我在我的应用程序中使用 Kafka,我想在 kafka 发送消息期间处理超时异常。我尝试接下来做:在回调时抛出运行时异常并在方法调用中捕获它。
这是kafka调用:
public final class ManualKafkaBetsProducer implements IKafkaBetsProducer
{
@Autowired
private KafkaTemplate<ReportsBetKey, IReportsBet> template;
public void send(IReportsBet bet)
{
template.send(MessageBuilder.withPayload(bet).setHeader(KafkaHeaders.TOPIC, kafkaBetsTopicName)
.setHeader(KafkaHeaders.MESSAGE_KEY, new ReportsBetKey(bet)).build())
.addCallback(res -> logger.debug("Message {} send OK"), ex -> {
throw new RuntimeException(ex);
});
}
}
这是方法调用:
try
{
manualKafkaBetsProducer.send(bet);
}
catch (Exception ex)
{
return TransactionResultType.GENERAL_ERROR.name();
}
问题是:当发生超时expeption时,它没有进入catch块。当发生其他错误时,它可以正常工作。
任何想法如何处理这个?
谢谢。
你的send
方法returns一个ListenableFuture<SendResult>
。
如果发送失败时你想在发送线程上得到一个异常,你可以等待这个 future 完成(通过调用 get()
)并检查结果。
如果您不希望发送线程等待,callback
很有用。它让您有机会让另一个线程稍后获取结果。
我在我的应用程序中使用 Kafka,我想在 kafka 发送消息期间处理超时异常。我尝试接下来做:在回调时抛出运行时异常并在方法调用中捕获它。
这是kafka调用:
public final class ManualKafkaBetsProducer implements IKafkaBetsProducer
{
@Autowired
private KafkaTemplate<ReportsBetKey, IReportsBet> template;
public void send(IReportsBet bet)
{
template.send(MessageBuilder.withPayload(bet).setHeader(KafkaHeaders.TOPIC, kafkaBetsTopicName)
.setHeader(KafkaHeaders.MESSAGE_KEY, new ReportsBetKey(bet)).build())
.addCallback(res -> logger.debug("Message {} send OK"), ex -> {
throw new RuntimeException(ex);
});
}
}
这是方法调用:
try
{
manualKafkaBetsProducer.send(bet);
}
catch (Exception ex)
{
return TransactionResultType.GENERAL_ERROR.name();
}
问题是:当发生超时expeption时,它没有进入catch块。当发生其他错误时,它可以正常工作。 任何想法如何处理这个? 谢谢。
你的send
方法returns一个ListenableFuture<SendResult>
。
如果发送失败时你想在发送线程上得到一个异常,你可以等待这个 future 完成(通过调用 get()
)并检查结果。
如果您不希望发送线程等待,callback
很有用。它让您有机会让另一个线程稍后获取结果。