Kafka adminClient 抛出 TimeoutException

Kafka adminClient throws TimeoutException

我有一个健康线程,每 5 秒从我的工作应用程序检查我的 Kafka 集群的状态。然而,我时不时地得到 TimeoutException:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=10=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)

我也有工具可以从外部监控我的集群(Cruise ControlGrafana),其中 none 指出集群中的任何问题。此外,我的辅助应用程序不断地使用消息并且 none 似乎失败了。

为什么我偶尔会超时?如果代理没有关闭,那么我认为我的配置中的某些内容已关闭。我将超时设置为 5 秒,这似乎绰绰有余。

我的 AdminClient 配置:

 @Bean
public AdminClient adminClient() {
    return KafkaAdminClient.create(adminClientConfigs());
}

 public Map<String, Object> adminClientConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
    return props;
}

我如何检查集群(我比代理列表上的 运行 逻辑):

@Autowired
private AdminClient adminClient;

 private void addCluster() throws ExecutionException, InterruptedException {
        adminClient.describeCluster().nodes().get().forEach(node -> brokers.add(node.host()));
    }

2 件事:

  1. 默认请求超时为 30 秒。通过将其设置为较小的值,您会增加缓慢请求超时的风险。如果 1000 个请求中有一个请求 (0.1%) 超过 5 秒,因为你每隔几秒查询一次,你每天都会看到几次失败。

  2. 要调查为什么有些调用需要更长时间,您可以做几件事:

    • 检查 Kafka 客户端日志。 describeCluster() 可能需要启动与集群的新连接。在这种情况下,客户端还必须发送 ApiVersionsRequest 并且根据您的配置,可能会建立 TLS 连接 and/or 执行 SASL 身份验证。如果发生任何这些,它应该在客户端日志中清楚。 (您可能需要稍微提高日志级别才能看到所有这些)。

    • 检查代理请求指标。 describeCluster() 转换为 MetadataRequest 发送给经纪人。您可以跟踪处理请求所需的时间。请参阅 docs 中描述的指标,尤其是:kafka.network:type=RequestMetrics,name=*,request=Metadata