Kafka adminClient 抛出 TimeoutException
Kafka adminClient throws TimeoutException
我有一个健康线程,每 5 秒从我的工作应用程序检查我的 Kafka 集群的状态。然而,我时不时地得到 TimeoutException
:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=10=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
我也有工具可以从外部监控我的集群(Cruise Control
、Grafana
),其中 none 指出集群中的任何问题。此外,我的辅助应用程序不断地使用消息并且 none 似乎失败了。
为什么我偶尔会超时?如果代理没有关闭,那么我认为我的配置中的某些内容已关闭。我将超时设置为 5 秒,这似乎绰绰有余。
我的 AdminClient 配置:
@Bean
public AdminClient adminClient() {
return KafkaAdminClient.create(adminClientConfigs());
}
public Map<String, Object> adminClientConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
return props;
}
我如何检查集群(我比代理列表上的 运行 逻辑):
@Autowired
private AdminClient adminClient;
private void addCluster() throws ExecutionException, InterruptedException {
adminClient.describeCluster().nodes().get().forEach(node -> brokers.add(node.host()));
}
2 件事:
默认请求超时为 30 秒。通过将其设置为较小的值,您会增加缓慢请求超时的风险。如果 1000 个请求中有一个请求 (0.1%) 超过 5 秒,因为你每隔几秒查询一次,你每天都会看到几次失败。
要调查为什么有些调用需要更长时间,您可以做几件事:
检查 Kafka 客户端日志。 describeCluster()
可能需要启动与集群的新连接。在这种情况下,客户端还必须发送 ApiVersionsRequest
并且根据您的配置,可能会建立 TLS 连接 and/or 执行 SASL 身份验证。如果发生任何这些,它应该在客户端日志中清楚。 (您可能需要稍微提高日志级别才能看到所有这些)。
检查代理请求指标。 describeCluster()
转换为 MetadataRequest
发送给经纪人。您可以跟踪处理请求所需的时间。请参阅 docs 中描述的指标,尤其是:kafka.network:type=RequestMetrics,name=*,request=Metadata
我有一个健康线程,每 5 秒从我的工作应用程序检查我的 Kafka 集群的状态。然而,我时不时地得到 TimeoutException
:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=10=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
我也有工具可以从外部监控我的集群(Cruise Control
、Grafana
),其中 none 指出集群中的任何问题。此外,我的辅助应用程序不断地使用消息并且 none 似乎失败了。
为什么我偶尔会超时?如果代理没有关闭,那么我认为我的配置中的某些内容已关闭。我将超时设置为 5 秒,这似乎绰绰有余。
我的 AdminClient 配置:
@Bean
public AdminClient adminClient() {
return KafkaAdminClient.create(adminClientConfigs());
}
public Map<String, Object> adminClientConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
return props;
}
我如何检查集群(我比代理列表上的 运行 逻辑):
@Autowired
private AdminClient adminClient;
private void addCluster() throws ExecutionException, InterruptedException {
adminClient.describeCluster().nodes().get().forEach(node -> brokers.add(node.host()));
}
2 件事:
默认请求超时为 30 秒。通过将其设置为较小的值,您会增加缓慢请求超时的风险。如果 1000 个请求中有一个请求 (0.1%) 超过 5 秒,因为你每隔几秒查询一次,你每天都会看到几次失败。
要调查为什么有些调用需要更长时间,您可以做几件事:
检查 Kafka 客户端日志。
describeCluster()
可能需要启动与集群的新连接。在这种情况下,客户端还必须发送ApiVersionsRequest
并且根据您的配置,可能会建立 TLS 连接 and/or 执行 SASL 身份验证。如果发生任何这些,它应该在客户端日志中清楚。 (您可能需要稍微提高日志级别才能看到所有这些)。检查代理请求指标。
describeCluster()
转换为MetadataRequest
发送给经纪人。您可以跟踪处理请求所需的时间。请参阅 docs 中描述的指标,尤其是:kafka.network:type=RequestMetrics,name=*,request=Metadata