同步卡夫卡生产者有没有例外

Is there any exception for synchronous Kafka producer

如您所知,发送消息有两种方式 - 同步和异步。

当我们使用同步模式编码时,代码如下所示

producer.send(new ProducerRecord<Long, Event>(topicName, event)).get();

读取Kafka文档https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-,定义如下:

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)

Asynchronously send a record to a topic. Equivalent to send(record, null). See send(ProducerRecord, Callback) for details.

Specified by:
    send in interface Producer<K,V> 

所以,基本上是 send() 方法 return 更进一步,一旦我为这个未来使用 .get() ,它就变成了同步行为。

我的问题是,从定义上看,我没有看到异常定义,如何捕获同步send()下的异常?似乎没有定义任何异常。有人可以帮忙澄清一下吗?

如果发送的记录不知何故无效并且可以在与 Kafka 集群通信之前被拒绝,那么 .send 将立即抛出 KafkaException(例如,如果记录太大,您将收到 RecordTooLargeException)。

如果集群端出现问题,例如分区不可用,您将得到一个由 .get() 的 ExecutionException 包装的 KafkaException。

还有一些极端情况,例如抛出 BufferExhaustedException 或一般异常 - 您可能需要查看源代码(尤其是 KafkaProducer 的 doSend),不幸的是文档并不完美。

查看该方法中来源的评论:

            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly