如何通过kafka中的java获取特定主题的消费者数量

How to get number of consumers for a particular topic through java in kafka

我需要在应用程序启动时自动启动 kafka 消费者。但在这样做之前,我想知道是否有任何其他消费者已经对该主题进行了投票。
我可以从 kafka confluent 站点看到这些详细信息,但我想通过 java 代码获取这些详细信息。
我知道 kafka 的 bin 文件夹中有一些 .sh 文件,但我想在生产环境中通过 java 代码执行此操作。

可以使用AdminClient来描述消费群体:

@SpringBootApplication
public class So67668813Application {

    public static void main(String[] args) {
        SpringApplication.run(So67668813Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67668813").partitions(4).replicas(1).build();
    }

    @KafkaListener(id = "so67668813", topics = "so67668813")
    public void listen(String in) {
        System.out.println(in);
    }


    @Bean
    public ApplicationRunner runner(KafkaAdmin admin) {
        return args -> {
            Thread.sleep(2000);
            try (AdminClient client = AdminClient.create(admin.getConfigurationProperties())) {
                Map<String, ConsumerGroupDescription> groups =
                        client.describeConsumerGroups(Collections.singletonList("so67668813"))
                                .all()
                                .get(10, TimeUnit.SECONDS);
                System.out.println(groups);
            }
        };
    }

}

{so67668813=(groupId=so67668813, isSimpleConsumerGroup=false, members=(memberId=consumer-so67668813-1-620a9dd7-c995-461d-bb7d-c141457a5799, groupInstanceId=null, clientId=consumer-so67668813-1, host=/127.0.0.1, assignment=(topicPartitions=so67668813-3,so67668813-1,so67668813-2,so67668813-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null), authorizedOperations=null)}