处理 Kafka Producer 超时异常的指南?

Guidelines to handle Timeout exception for Kafka Producer?

我的Kafka producer由于各种原因经常出现Timeout异常。我目前使用生产者配置的所有默认值。

我看到了以下超时异常:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for topic-1-0: 30001 ms has passed since last append

我有以下问题:

  1. 这些超时异常的一般原因是什么?

    1. 临时网络问题
    2. 服务器问题?如果是那么是什么类型的服务器问题?
  2. 处理超时异常的一般准则是什么?

    1. 设置 'retries' 配置以便 Kafka API 进行重试?
    2. 增加 'request.timeout.ms' 或 'max.block.ms' ?
    3. 捕获异常并让应用程序层重试发送消息,但这似乎很难使用异步发送,因为消息将被乱序发送?
  3. 超时异常是可重试的异常吗?重试它们是否安全?

我正在使用 Kafka v2.1.0 和 Java 11.

提前致谢。

生产者和代理的默认 Kafka 配置值都足够保守,在一般情况下,您不应该 运行 进入任何超时。这些问题通常指向生产者和经纪人之间的 flaky/lossy 网络。

您遇到的异常 Failed to update metadata,通常意味着生产者无法访问其中一个代理,结果是它无法获取元数据。

对于您的第二个问题,Kafka 将自动重试发送未被代理完全确认的消息。当您在应用程序端超时时是否要捕获并重试由您决定,但如果您达到 1 分钟以上的超时,重试可能不会产生太大影响。无论如何,您将不得不找出经纪人的潜在 network/reachability 问题。

根据我的经验,网络问题通常是:

  • 端口 9092 被防火墙阻止,无论是在生产者端还是在代理端,或者在中间的某个地方(尝试 nc -z broker-ip 9092 来自服务器 运行 生产者)
  • DNS 解析被破坏,因此即使端口打开,生产者也无法解析到 IP 地址。

"What are the general causes of these Timeout exceptions?"

  1. 我之前看到的最常见原因是元数据信息过时:一个代理宕机,该代理上的主题分区被故障转移到其他代理。但是,主题元数据信息尚未正确更新,客户端仍会尝试与失败的代理对话以获取元数据信息或发布消息。这会导致超时异常。

  2. 网络连接问题。这可以很容易地诊断为 telnet broker_host borker_port

  3. 代理过载。如果代理因高工作负载而饱和,或者托管太多主题分区,就会发生这种情况。

处理超时异常,一般做法是:

  1. 排除经纪人方面的问题。确保主题分区已完全复制,并且代理未超载

  2. 修复主机名解析或网络连接问题(如果有的话)

  3. 调整request.timeout.msdelivery.timeout.ms等参数。我过去的经验是默认值在大多数情况下都能正常工作。

如果生产者或消费者无法访问 "advertised.listeners"(protocol://host:port) 的值,则会发生超时异常

通过以下命令检查 属性 "advertised.listeners" 的配置:

cat $KAFKA_HOME/config/server.properties

我建议在构建 Producer 配置时使用以下属性

需要分区 - 领导者的确认

kafka.acks=1

kafka 生产者发送消息和接收来自领导者的确认的最大重试次数

kafka.retries=3

每个单独请求的请求超时

timeout.ms=200

等待再次发送下一个请求;这是为了避免在紧密循环中发送请求;

retry.backoff.ms=50

完成所有重试的上限

dataLogger.kafka.delivery.timeout.ms=1200

producer.send(record, new Callback {
  override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
    if (e != null) {
      logger.debug(s"KafkaLogger : Message Sent $record to  Topic  ${recordMetadata.topic()}, Partition ${recordMetadata.partition()} , Offset ${recordMetadata.offset()} ")
    } else {
      logger.error(s"Exception while sending message $item to Error topic :$e")
    }
  }
})

超时关闭生产者

producer.close(1000, TimeUnit.MILLISECONDS)