如何使用 @KafkaListener spring 引导 2 消费者获取消费者 ID
How to get consumer-id using a @KafkaListener spring boot 2 consumer
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer
是否可以在 Spring boot @KafkaListener consumer
中获取上述命令输出中的 consumer-id
信息?
我想将此消费者 ID 添加到 table 以表示处理数据的处理器。
我已经阅读了@gary-russell 在 How to get kafka consumer-id for logging 上的回答,但我没有看到 consumer-id 出现在分区分配的日志中。
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer.info - my-consumer: partitions assigned: [test_topic-7, test_topic-6, test_topic-5, test_topic-4]
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer.info - my-consumer: partitions assigned: [test_topic-3, test_topic-2, test_topic-1, test_topic-0]
我正在使用 spring boot 2.2.2
依赖项:spring-kafka
您可能希望使用以下使用自动装配的方法链 AdminClient
:
public Collection<MemberDescription> members()
throws InterruptedException, ExecutionException, TimeoutException
{
String group = "my-consumer";
List<String> groups = Collections.singletonList(group);
return adminClient
.describeConsumerGroups(groups) // DescribeConsumerGroupsResult
.describedGroups() // Map<String, KafkaFuture<ConsumerGroupDescription>>
.get(group) // KafkaFuture<ConsumerGroupDescription>
.get(2, TimeUnit.SECONDS) // ConsumerGroupDescription
.members(); // Collection<MemberDescription>
}
MemberDescription
的返回集合包含您的消费者。该对象提供以下 "getters":
MemberDescription::clientId
和 CLIENT-ID
个条目。
MemberDescription::consumerId
和 COMSUMER-ID
个条目。
KafkaFuture
包装背后的原因是支持 Java-8 实现 java.util.concurrent.Future<T>
的异步编程风格。您可以使用 (.get(2, TimeUnit.SECONDS)
) 阻止调用并希望得到结果(当时应该可用),或者通过 Java-8 Futures.
使用适当的异步方式
我认为 consumer-id
在客户端上不可用;你可以从 metrics
:
得到 client-id
s
@SpringBootApplication
public class So61616543Application {
public static void main(String[] args) {
SpringApplication.run(So61616543Application.class, args).close();
}
@KafkaListener(id = "so61616543", topics = "so61616543")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so61616543").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
registry.getListenerContainer("so61616543").metrics()
.forEach((clientId, metrics) -> {
System.out.println(clientId);
metrics.forEach((key, value) -> System.out.println(key + ":" + value));
});
};
}
}
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer
是否可以在 Spring boot @KafkaListener consumer
中获取上述命令输出中的 consumer-id
信息?
我想将此消费者 ID 添加到 table 以表示处理数据的处理器。
我已经阅读了@gary-russell 在 How to get kafka consumer-id for logging 上的回答,但我没有看到 consumer-id 出现在分区分配的日志中。
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer.info - my-consumer: partitions assigned: [test_topic-7, test_topic-6, test_topic-5, test_topic-4]
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer.info - my-consumer: partitions assigned: [test_topic-3, test_topic-2, test_topic-1, test_topic-0]
我正在使用 spring boot 2.2.2
依赖项:spring-kafka
您可能希望使用以下使用自动装配的方法链 AdminClient
:
public Collection<MemberDescription> members()
throws InterruptedException, ExecutionException, TimeoutException
{
String group = "my-consumer";
List<String> groups = Collections.singletonList(group);
return adminClient
.describeConsumerGroups(groups) // DescribeConsumerGroupsResult
.describedGroups() // Map<String, KafkaFuture<ConsumerGroupDescription>>
.get(group) // KafkaFuture<ConsumerGroupDescription>
.get(2, TimeUnit.SECONDS) // ConsumerGroupDescription
.members(); // Collection<MemberDescription>
}
MemberDescription
的返回集合包含您的消费者。该对象提供以下 "getters":
MemberDescription::clientId
和CLIENT-ID
个条目。MemberDescription::consumerId
和COMSUMER-ID
个条目。
KafkaFuture
包装背后的原因是支持 Java-8 实现 java.util.concurrent.Future<T>
的异步编程风格。您可以使用 (.get(2, TimeUnit.SECONDS)
) 阻止调用并希望得到结果(当时应该可用),或者通过 Java-8 Futures.
我认为 consumer-id
在客户端上不可用;你可以从 metrics
:
client-id
s
@SpringBootApplication
public class So61616543Application {
public static void main(String[] args) {
SpringApplication.run(So61616543Application.class, args).close();
}
@KafkaListener(id = "so61616543", topics = "so61616543")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so61616543").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
registry.getListenerContainer("so61616543").metrics()
.forEach((clientId, metrics) -> {
System.out.println(clientId);
metrics.forEach((key, value) -> System.out.println(key + ":" + value));
});
};
}
}