Spring Kafka 消费者 - 打印 Kafka 滞后信息
Spring Kafka Consumer - Print Kafka Lag Info
我创建了一个 spring 读取主题的 kafka 消费者。有没有办法像我们打印分区信息那样打印滞后信息?
有一个命令行工具...
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group myGroup
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myTopic 0 66 66 0 -
编辑
您可以运行命令行工具并捕获输出...
Process process = new ProcessBuilder()
.command("/usr/local/bin/kafka-consumer-groups", "--bootstrap-server", "localhost:9092",
"--describe", "--group", "siTestGroup")
.start();
InputStream inputStream = process.getInputStream();
process.waitFor(10, TimeUnit.SECONDS);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FileCopyUtils.copy(inputStream, baos);
System.out.println(new String(baos.toByteArray()));
虽然没有提供源代码,但我假设您通过 @KafkaListener 注释实现了您的消费者。
我已经克服了您描述的使用 org.apache.kafka.clients.consumer.Consumer 界面的相同问题,如 here 所述。可以在@KafkaListener注解下在consumer方法中声明为参数。
该接口提供了 metrics() 方法,其中包含存储在 records-max-lag 属性.[= 中的消费者滞后信息。 15=]
private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);
@KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
Consumer<?, ?> consumer) {
String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
.map(Metric::metricValue).map(Object::toString).distinct()
.collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));
LOGGER.info(lag);
}
在这种情况下,我明确选择了 records-lag-max 属性。您可以选择任何其他消费者指标,列表位于 Confluent Docs.
上面的代码片段将有以下输出:
[Kafka current consumer lag] X records
其中 X 是此 window.
中任何分区的记录数方面的最大滞后
重要:
我正在使用 Spring Kafka 库
的 2.3.3.RELEASE 版本
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>
我创建了一个 spring 读取主题的 kafka 消费者。有没有办法像我们打印分区信息那样打印滞后信息?
有一个命令行工具...
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group myGroup
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myTopic 0 66 66 0 -
编辑
您可以运行命令行工具并捕获输出...
Process process = new ProcessBuilder()
.command("/usr/local/bin/kafka-consumer-groups", "--bootstrap-server", "localhost:9092",
"--describe", "--group", "siTestGroup")
.start();
InputStream inputStream = process.getInputStream();
process.waitFor(10, TimeUnit.SECONDS);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FileCopyUtils.copy(inputStream, baos);
System.out.println(new String(baos.toByteArray()));
虽然没有提供源代码,但我假设您通过 @KafkaListener 注释实现了您的消费者。 我已经克服了您描述的使用 org.apache.kafka.clients.consumer.Consumer 界面的相同问题,如 here 所述。可以在@KafkaListener注解下在consumer方法中声明为参数。 该接口提供了 metrics() 方法,其中包含存储在 records-max-lag 属性.[= 中的消费者滞后信息。 15=]
private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);
@KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
Consumer<?, ?> consumer) {
String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
.map(Metric::metricValue).map(Object::toString).distinct()
.collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));
LOGGER.info(lag);
}
在这种情况下,我明确选择了 records-lag-max 属性。您可以选择任何其他消费者指标,列表位于 Confluent Docs.
上面的代码片段将有以下输出:
[Kafka current consumer lag] X records
其中 X 是此 window.
重要:
我正在使用 Spring Kafka 库
的 2.3.3.RELEASE 版本<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>