同步卡夫卡生产者有没有例外
Is there any exception for synchronous Kafka producer
如您所知,发送消息有两种方式 - 同步和异步。
当我们使用同步模式编码时,代码如下所示
producer.send(new ProducerRecord<Long, Event>(topicName, event)).get();
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)
Asynchronously send a record to a topic. Equivalent to send(record, null). See send(ProducerRecord, Callback) for details.
Specified by:
send in interface Producer<K,V>
所以,基本上是 send() 方法 return 更进一步,一旦我为这个未来使用 .get() ,它就变成了同步行为。
我的问题是,从定义上看,我没有看到异常定义,如何捕获同步send()下的异常?似乎没有定义任何异常。有人可以帮忙澄清一下吗?
如果发送的记录不知何故无效并且可以在与 Kafka 集群通信之前被拒绝,那么 .send
将立即抛出 KafkaException(例如,如果记录太大,您将收到 RecordTooLargeException)。
如果集群端出现问题,例如分区不可用,您将得到一个由 .get()
的 ExecutionException 包装的 KafkaException。
还有一些极端情况,例如抛出 BufferExhaustedException 或一般异常 - 您可能需要查看源代码(尤其是 KafkaProducer 的 doSend
),不幸的是文档并不完美。
查看该方法中来源的评论:
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
如您所知,发送消息有两种方式 - 同步和异步。
当我们使用同步模式编码时,代码如下所示
producer.send(new ProducerRecord<Long, Event>(topicName, event)).get();
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)
Asynchronously send a record to a topic. Equivalent to send(record, null). See send(ProducerRecord, Callback) for details.
Specified by:
send in interface Producer<K,V>
所以,基本上是 send() 方法 return 更进一步,一旦我为这个未来使用 .get() ,它就变成了同步行为。
我的问题是,从定义上看,我没有看到异常定义,如何捕获同步send()下的异常?似乎没有定义任何异常。有人可以帮忙澄清一下吗?
如果发送的记录不知何故无效并且可以在与 Kafka 集群通信之前被拒绝,那么 .send
将立即抛出 KafkaException(例如,如果记录太大,您将收到 RecordTooLargeException)。
如果集群端出现问题,例如分区不可用,您将得到一个由 .get()
的 ExecutionException 包装的 KafkaException。
还有一些极端情况,例如抛出 BufferExhaustedException 或一般异常 - 您可能需要查看源代码(尤其是 KafkaProducer 的 doSend
),不幸的是文档并不完美。
查看该方法中来源的评论:
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly