Kafka消费者健康检查

Kafka consumer health check

是否有一种简单的方法来判断消费者(使用 spring 引导和@KafkaListener 创建)是否正常运行? 这包括 - 可以访问和轮询代理,至少分配了一个分区等。

我看到有一些方法可以订阅不同的生命周期事件,但这似乎是一个非常脆弱的解决方案。

提前致谢!

您可以使用AdminClient获取当前群组状态...

@SpringBootApplication
public class So56134056Application {

    public static void main(String[] args) {
        SpringApplication.run(So56134056Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56134056", 1, (short) 1);
    }

    @KafkaListener(id = "so56134056", topics = "so56134056")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaAdmin admin) {
        return args -> {
            try (AdminClient client = AdminClient.create(admin.getConfig())) {
                while (true) {
                    Map<String, ConsumerGroupDescription> map =
                            client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get(10, TimeUnit.SECONDS);
                    System.out.println(map);
                    System.in.read();
                }
            }
        };
    }

}

{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8, clientId=consumer-2, host=/127.0.0.1, assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}

我们一直在考虑将 getLastPollTime() 暴露给侦听器容器 API。

getAssignedPartitions() 从 2.1.3 开始可用。

我知道你没有在你的 post 中提到它 - 但如果你随后在 AWS 中部署并对你的 ELB 扩展环境使用这样的健康检查,请注意将这样的项目添加到健康检查中.

例如,可能发生的一种情况是您的应用程序失去与 Kafka 的连接 - 您的健康检查变为红色 - 然后弹性 beantalks 开始杀死和 re-starting 您的实例的过程(这将持续发生直到您的 Kafka 实例再次可用)。这可能代价高昂!

还有一个更普遍的哲学问题是健康检查是否应该 'cascade failures' 例如kafka 已关闭,因此连接到 kafka 的应用程序声称它已关闭,链中的下一个应用程序也执行相同操作,等等。这通常更通常通过断路器实现,断路器旨在最大限度地减少注定失败的慢速调用。

您可以使用AdminClient查看主题描述。

final AdminClient client = AdminClient.create(kafkaConsumerFactory.getConfigurationProperties());

final String topic = "someTopicName";

final DescribeTopicsResult describeTopicsResult = client.describeTopics(Collections.singleton(topic));
final KafkaFuture<TopicDescription> future = describeTopicsResult.values().get(topic);
                    
try {
  // for healthcheck purposes we're fetching the topic description
  future.get(10, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
  throw new RuntimeException("Failed to retrieve topic description for topic: " + topic, e);
}