处理 Kafka Producer 超时异常的指南?
Guidelines to handle Timeout exception for Kafka Producer?
我的Kafka producer由于各种原因经常出现Timeout异常。我目前使用生产者配置的所有默认值。
我看到了以下超时异常:
org.apache.kafka.common.errors.TimeoutException: Failed to update
metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
for topic-1-0: 30001 ms has passed since last append
我有以下问题:
这些超时异常的一般原因是什么?
- 临时网络问题
- 服务器问题?如果是那么是什么类型的服务器问题?
处理超时异常的一般准则是什么?
- 设置 'retries' 配置以便 Kafka API 进行重试?
- 增加 'request.timeout.ms' 或 'max.block.ms' ?
- 捕获异常并让应用程序层重试发送消息,但这似乎很难使用异步发送,因为消息将被乱序发送?
超时异常是可重试的异常吗?重试它们是否安全?
我正在使用 Kafka v2.1.0 和 Java 11.
提前致谢。
生产者和代理的默认 Kafka 配置值都足够保守,在一般情况下,您不应该 运行 进入任何超时。这些问题通常指向生产者和经纪人之间的 flaky/lossy 网络。
您遇到的异常 Failed to update metadata
,通常意味着生产者无法访问其中一个代理,结果是它无法获取元数据。
对于您的第二个问题,Kafka 将自动重试发送未被代理完全确认的消息。当您在应用程序端超时时是否要捕获并重试由您决定,但如果您达到 1 分钟以上的超时,重试可能不会产生太大影响。无论如何,您将不得不找出经纪人的潜在 network/reachability 问题。
根据我的经验,网络问题通常是:
- 端口 9092 被防火墙阻止,无论是在生产者端还是在代理端,或者在中间的某个地方(尝试
nc -z broker-ip 9092
来自服务器 运行 生产者)
- DNS 解析被破坏,因此即使端口打开,生产者也无法解析到 IP 地址。
"What are the general causes of these Timeout exceptions?"
我之前看到的最常见原因是元数据信息过时:一个代理宕机,该代理上的主题分区被故障转移到其他代理。但是,主题元数据信息尚未正确更新,客户端仍会尝试与失败的代理对话以获取元数据信息或发布消息。这会导致超时异常。
网络连接问题。这可以很容易地诊断为 telnet broker_host borker_port
代理过载。如果代理因高工作负载而饱和,或者托管太多主题分区,就会发生这种情况。
处理超时异常,一般做法是:
排除经纪人方面的问题。确保主题分区已完全复制,并且代理未超载
修复主机名解析或网络连接问题(如果有的话)
调整request.timeout.ms
、delivery.timeout.ms
等参数。我过去的经验是默认值在大多数情况下都能正常工作。
如果生产者或消费者无法访问 "advertised.listeners"(protocol://host:port) 的值,则会发生超时异常
通过以下命令检查 属性 "advertised.listeners" 的配置:
cat $KAFKA_HOME/config/server.properties
我建议在构建 Producer 配置时使用以下属性
需要分区 - 领导者的确认
kafka.acks=1
kafka 生产者发送消息和接收来自领导者的确认的最大重试次数
kafka.retries=3
每个单独请求的请求超时
timeout.ms=200
等待再次发送下一个请求;这是为了避免在紧密循环中发送请求;
retry.backoff.ms=50
完成所有重试的上限
dataLogger.kafka.delivery.timeout.ms=1200
producer.send(record, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
logger.debug(s"KafkaLogger : Message Sent $record to Topic ${recordMetadata.topic()}, Partition ${recordMetadata.partition()} , Offset ${recordMetadata.offset()} ")
} else {
logger.error(s"Exception while sending message $item to Error topic :$e")
}
}
})
超时关闭生产者
producer.close(1000, TimeUnit.MILLISECONDS)
我的Kafka producer由于各种原因经常出现Timeout异常。我目前使用生产者配置的所有默认值。
我看到了以下超时异常:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for topic-1-0: 30001 ms has passed since last append
我有以下问题:
这些超时异常的一般原因是什么?
- 临时网络问题
- 服务器问题?如果是那么是什么类型的服务器问题?
处理超时异常的一般准则是什么?
- 设置 'retries' 配置以便 Kafka API 进行重试?
- 增加 'request.timeout.ms' 或 'max.block.ms' ?
- 捕获异常并让应用程序层重试发送消息,但这似乎很难使用异步发送,因为消息将被乱序发送?
超时异常是可重试的异常吗?重试它们是否安全?
我正在使用 Kafka v2.1.0 和 Java 11.
提前致谢。
生产者和代理的默认 Kafka 配置值都足够保守,在一般情况下,您不应该 运行 进入任何超时。这些问题通常指向生产者和经纪人之间的 flaky/lossy 网络。
您遇到的异常 Failed to update metadata
,通常意味着生产者无法访问其中一个代理,结果是它无法获取元数据。
对于您的第二个问题,Kafka 将自动重试发送未被代理完全确认的消息。当您在应用程序端超时时是否要捕获并重试由您决定,但如果您达到 1 分钟以上的超时,重试可能不会产生太大影响。无论如何,您将不得不找出经纪人的潜在 network/reachability 问题。
根据我的经验,网络问题通常是:
- 端口 9092 被防火墙阻止,无论是在生产者端还是在代理端,或者在中间的某个地方(尝试
nc -z broker-ip 9092
来自服务器 运行 生产者) - DNS 解析被破坏,因此即使端口打开,生产者也无法解析到 IP 地址。
"What are the general causes of these Timeout exceptions?"
我之前看到的最常见原因是元数据信息过时:一个代理宕机,该代理上的主题分区被故障转移到其他代理。但是,主题元数据信息尚未正确更新,客户端仍会尝试与失败的代理对话以获取元数据信息或发布消息。这会导致超时异常。
网络连接问题。这可以很容易地诊断为
telnet broker_host borker_port
代理过载。如果代理因高工作负载而饱和,或者托管太多主题分区,就会发生这种情况。
处理超时异常,一般做法是:
排除经纪人方面的问题。确保主题分区已完全复制,并且代理未超载
修复主机名解析或网络连接问题(如果有的话)
调整
request.timeout.ms
、delivery.timeout.ms
等参数。我过去的经验是默认值在大多数情况下都能正常工作。
如果生产者或消费者无法访问 "advertised.listeners"(protocol://host:port) 的值,则会发生超时异常
通过以下命令检查 属性 "advertised.listeners" 的配置:
cat $KAFKA_HOME/config/server.properties
我建议在构建 Producer 配置时使用以下属性
需要分区 - 领导者的确认
kafka.acks=1
kafka 生产者发送消息和接收来自领导者的确认的最大重试次数
kafka.retries=3
每个单独请求的请求超时
timeout.ms=200
等待再次发送下一个请求;这是为了避免在紧密循环中发送请求;
retry.backoff.ms=50
完成所有重试的上限
dataLogger.kafka.delivery.timeout.ms=1200
producer.send(record, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
logger.debug(s"KafkaLogger : Message Sent $record to Topic ${recordMetadata.topic()}, Partition ${recordMetadata.partition()} , Offset ${recordMetadata.offset()} ")
} else {
logger.error(s"Exception while sending message $item to Error topic :$e")
}
}
})
超时关闭生产者
producer.close(1000, TimeUnit.MILLISECONDS)