如何为 KafkaStreams 中的主题添加健康检查 api

How to add health check for topics in KafkaStreams api

我有一个关键的 Kafka 应用程序需要一直 运行 启动。源主题由 debezium kafka connect for mysql binlog 创建。不幸的是,此设置可能会出现很多问题。很多时候 debezium 连接器失败并需要重新启动,然后我的应用程序也是如此(因为没有抛出任何异常它只是挂起并停止使用)。我手动测试和发现故障的方法是检查 kibana 日志,然后通过终端使用可疑主题。我可以在代码中模仿这一点,但显然不是最佳实践。我想知道 KafkaStream api 是否有能力让我做这样的健康检查,并检查 kafka 集群的其他部分? 另一点困扰我的是,当连接器再次启动时,我是否可以让流保持活跃并重新加入主题。

您可以检查 Kafka Streams State 以查看它是否 rebalancing/running,这表明操作正常。虽然,如果没有数据进入拓扑,我认为不会发生错误,因此您需要查找上游依赖项的运行状况。

总的来说,听起来您可能想花一些时间使用 Consul 或 Sensu 等监控工具,这些工具可以 运行 本地服务运行状况检查并在服务出现故障时发出警报。或者至少 Elasticseach alerting

就 Kafka 健康检查而言,您可以通过多种方式进行

  1. broker 和 zookeeper 进程 运行ning? (SSH 到节点,检查进程)
  2. broker 和 zookeeper 端口是否打开? (使用Socket连接)
  3. 是否有可以跟踪的重要 JMX 指标? (Metricbeat)
  4. 你能找到一个活跃的 Controller 代理吗(使用 AdminClient#describeCluster
  5. 作为 Controller 元数据(可以从 AdminClient 获得)的一部分,您是否需要响应最少数量的经纪人
  6. 您使用的主题配置是否正确? (保留、最小 isr、复制因子、分区计数等)? (再次使用 AdminClient