处理 Kafka 超时异常 Java

Handling Kafka timeout exception Java

我在我的应用程序中使用 Kafka,我想在 kafka 发送消息期间处理超时异常。我尝试接下来做:在回调时抛出运行时异常并在方法调用中捕获它。

这是kafka调用:

public final class ManualKafkaBetsProducer implements IKafkaBetsProducer
{
    @Autowired
    private KafkaTemplate<ReportsBetKey, IReportsBet> template;

    public void send(IReportsBet bet)
    {
        template.send(MessageBuilder.withPayload(bet).setHeader(KafkaHeaders.TOPIC, kafkaBetsTopicName)
                .setHeader(KafkaHeaders.MESSAGE_KEY, new ReportsBetKey(bet)).build())
                .addCallback(res -> logger.debug("Message {} send OK"), ex -> {
                    throw new RuntimeException(ex);
                });
    }
}

这是方法调用:

try
{
    manualKafkaBetsProducer.send(bet);
}
catch (Exception ex)
{
     return TransactionResultType.GENERAL_ERROR.name();
}

问题是:当发生超时expeption时,它没有进入catch块。当发生其他错误时,它可以正常工作。 任何想法如何处理这个? 谢谢。

你的send方法returns一个ListenableFuture<SendResult>

如果发送失败时你想在发送线程上得到一个异常,你可以等待这个 future 完成(通过调用 get() )并检查结果。

如果您希望发送线程等待,callback 很有用。它让您有机会让另一个线程稍后获取结果。