Kafka生产者回调异常

Kafka producer callback Exception

当我们生成消息时,我们可以定义一个回调,这个回调可以期待一个异常:

kafkaProducer.send(producerRecord, new Callback() {
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e == null) {
      // OK
    } else {
      // NOT OK
    }
  }
});

考虑到producer内置的重试逻辑,开发者应该明确处理哪种异常?

您可能会得到 BufferExhaustedExceptionTimeoutException

制作者制作完一张唱片后,就把你的Kafka放下来。然后继续制作唱片。一段时间后,您应该会在回调中看到异常。

这是因为,当您发送第一条记录时,会提取元数据,之后,记录将被批处理和缓冲,并且它们最终会在超时后过期,在此期间您可能会看到这些异常。

我想超时是 delivery.timeout.ms,当超时时会给你一个 TimeoutException 异常。

根据 Callback Java Docs 回调期间可能发生以下异常:

The exception thrown during processing of this record. Null if no error occurred. Possible thrown exceptions include:

Non-Retriable exceptions (fatal, the message will never be sent):

  • InvalidTopicException
  • OffsetMetadataTooLargeException
  • RecordBatchTooLargeException
  • RecordTooLargeException
  • UnknownServerException

Retriable exceptions (transient, may be covered by increasing #.retries):

  • CorruptRecordException
  • InchvalidMetadataException
  • NotEnoughReplicasAfterAppendException
  • NotEnoughReplicasException
  • OffsetOutOfRangeException
  • 超时异常
  • UnknownTopicOrPartitionException

也许这是一个不令人满意的答案,但最终哪些异常和如何处理它们完全取决于您的用例和业务需求。

处理生产者重试

但是,作为开发者,您还需要处理Kafka Producer自身的重试机制。重试主要由以下驱动:

retries: Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection (default: 5) to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.

retry.backoff.ms: The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

request.timeout.ms: The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

建议保留上述三个配置的默认值,而是关注

定义的硬时间上限

delivery.timeout.ms: An upper bound on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed for retriable send failures. The producer may report failure to send a record earlier than this config if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch which reached an earlier delivery expiration deadline. The value of this config should be greater than or equal to the sum of request.timeout.ms and linger.ms.

尝试向@Mike 的回答添加更多信息,我认为回调接口中只有少数异常是枚举。

Here you can see the whole list: kafka.common.errors

And here, you can see which ones are retriables and which ones are not: kafka protocol guide

代码可以这样写:

producer.send(record, callback)

def callback: Callback = new Callback {
    override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
      if(null != e) {
         if (e == RecordTooLargeException || e == UnknownServerException || ..) {
            log.error("Winter is comming") //it's non-retriable
            writeDiscardRecordsToSomewhereElse
         } else {
            log.warn("It's no that cold") //it's retriable
         }
      } else {
        log.debug("It's summer. Everything is fine")
      }
    }
}