区分如何处理异步Kafka生产者中的异常

Distinguish how to handle exceptions in async Kafka producer

向 Kafka 生成消息时,您可能会遇到两种错误:可重试错误和不可重试错误。处理时应该如何区分?

我想异步生成记录,将 callback object 收到不可重试异常的记录保存在另一个主题(或 HBase)中,让生产者为我处理所有收到可重试异常的记录(最多最大尝试次数,当它最终达到时,成为最先尝试的次数之一)。

我的问题是:尽管callback object,生产者仍会自行处理可检索的异常吗? 因为在 Interface Callback 中说:

Retriable exceptions (transient, may be covered by increasing # .retries)

代码可能是这样的吗?

producer.send(record, callback)

def callback: Callback = new Callback {
    override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
      if(null != e) {
         if (e == RecordTooLargeException || e == UnknownServerException || ..) {
            log.error("Winter is comming")
            writeDiscardRecordsToSomewhereElse
         } else {
            log.warn("It's no that cold") //it's retriable. The producer will keep trying by itself?
         }
      } else {
        log.debug("It's summer. Everything is fine")
      }
    }
}

卡夫卡版本:0.10.0

任何光将不胜感激! :)

正如卡夫卡圣经(又名 Kafka-The Definitive Guide)所说:

The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a nonretriable failure, commitAsync() will not retry.

原因:

It does not retry is that by the time commitAsync() receives a response from the server, there may have been a later commit that was already successful.

Imagine that we sent a request to commit offset 2000. There is a temporary communication problem, so the broker never gets the request and therefore never responds. Meanwhile, we processed another batch and successfully committed offset 3000. If commitA sync() now retries the previously failed commit, it might succeed in committing offset 2000 after offset 3000 was already processed and committed. In the case of a rebalance, this will cause more duplicates.

除此之外,您仍然可以创建一个递增的序列号,您可以在每次提交并将该数字添加到回调对象时增加它。当重试时间到来时,只需检查 Acc 的当前值是否等于您给回调的数字。如果是,则它是安全的,您可以执行提交。否则,已经有一个新的提交,你不应该重试这个偏移量的提交。

好像很麻烦,那是因为如果你有这个想法,你应该改变你的策略。