MessageListenerContainer 的指标方法未捕获正确的值

metrics method of MessageListenerContainer is not capturing the right values

我正在使用 spring-kafka 2.2.8 创建批处理消费者并尝试捕获我的容器指标以了解批处理消费者的性能细节。

@Bean
public ConsumerFactory consumerFactory(){
    return new DefaultKafkaConsumerFactory(consumerConfigs(),stringKeyDeserializer(), avroValueDeserializer());
}


@Bean
public FixedBackOffPolicy getBackOffPolicy() {
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(100);
    return backOffPolicy;
}
   
   
   

@Bean
    public ConcurrentKafkaListenerContainerFactory kafkaBatchListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setStatefulRetry(true);
        
        return factory;
    }


public Map<String, Object> consumerConfigs(){

        Map<String, Object> configs = new HashMap<>();

        batchConsumerConfigProperties.setKeyDeserializerClassConfig();
        batchConsumerConfigProperties.setValueDeserializerClassConfig();
        batchConsumerConfigProperties.setKeyDeserializerClass(StringDeserializer.class);
        batchConsumerConfigProperties.setValueDeserializerClass(KafkaAvroDeserializer.class);
        batchConsumerConfigProperties.setSpecificAvroReader("true");

        batchConsumerConfigProperties.setAutoOffsetResetConfig(environment.getProperty("sapphire.kes.consumer.auto.offset.reset", "earliest"));
        batchConsumerConfigProperties.setEnableAutoCommitConfig(environment.getProperty("sapphire.kes.consumer.enable.auto.commit", "false"));

        batchConsumerConfigProperties.setMaxPollIntervalMs(environment.getProperty(MAX_POLL_INTERVAL_MS_CONFIG, "300000"));
        batchConsumerConfigProperties.setMaxPollRecords(environment.getProperty(MAX_POLL_RECORDS_CONFIG, "50000"));
        batchConsumerConfigProperties.setSessionTimeoutms(environment.getProperty(SESSION_TIMEOUT_MS_CONFIG, "10000"));
        batchConsumerConfigProperties.setRequestTimeOut(environment.getProperty(REQUEST_TIMEOUT_MS_CONFIG, "30000"));
        batchConsumerConfigProperties.setHeartBeatIntervalMs(environment.getProperty(HEARTBEAT_INTERVAL_MS_CONFIG, "3000"));
        batchConsumerConfigProperties.setFetchMinBytes(environment.getProperty(FETCH_MIN_BYTES_CONFIG, "1"));
        batchConsumerConfigProperties.setFetchMaxBytes(environment.getProperty(FETCH_MAX_BYTES_CONFIG, "52428800"));
        batchConsumerConfigProperties.setFetchMaxWaitMS(environment.getProperty(FETCH_MAX_WAIT_MS_CONFIG, "500"));
        batchConsumerConfigProperties.setMaxPartitionFetchBytes(environment.getProperty(MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"));
        batchConsumerConfigProperties.setConnectionsMaxIdleMs(environment.getProperty(CONNECTIONS_MAX_IDLE_MS_CONFIG, "540000"));
        batchConsumerConfigProperties.setAutoCommitIntervalMS(environment.getProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"));
        batchConsumerConfigProperties.setReceiveBufferBytes(environment.getProperty(RECEIVE_BUFFER_CONFIG, "65536"));
        batchConsumerConfigProperties.setSendBufferBytes(environment.getProperty(SEND_BUFFER_CONFIG, "131072"));
}

这是我的消费者代码,我在其中尝试捕获容器指标

@Component
public class MyBatchConsumer {

    private final KafkaListenerEndpointRegistry registry;
    
    @Autowired
     public MyBatchConsumer(KafkaListenerEndpointRegistry registry) {
            this.registry = registry;
    }

@KafkaListener(topics = "myTopic", containerFactory = "kafkaBatchListenerContainerFactory", id = "myBatchConsumer")
    public void consumeRecords(List<ConsumerRecord> messages) {

        System.out.println("messages size - " + messages.size());

        if(mybatchconsumerMessageCount == 0){
            ConsumerPerfTestingConstants.batchConsumerStartTime = System.currentTimeMillis();
            ConsumerPerfTestingConstants.batchConsumerStartDateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("MM/dd/yyyy HH:mm:ss"));
        }

        mybatchconsumerMessageCount = mybatchconsumerMessageCount + messages.size());
        System.out.println("\n\n\n batchConsumerConsumedMessages " + mybatchconsumerMessageCount);

        if (mybatchconsumerMessageCount == targetMessageCount) {
            System.out.println("ATTENTION! ATTENTION! ATTENTION! Consumer Finished processing " + messageCount + " messages");


            registry.getListenerContainerIds().forEach(
                    listenerId -> System.out.println(" kes batch consumer listenerId is "+listenerId)
            );

            String listenerID = registry.getListenerContainerIds().stream().filter(listenerId -> listenerId.startsWith("myBatchConsumer")).findFirst().get();

            System.out.println(" kes batch consumer listenerID is "+listenerID);

            Map<String, Map<MetricName, ? extends Metric>> metrics = registry.getListenerContainer(listenerID).metrics();
            registry.getListenerContainer(listenerID).stop();
            
            System.out.println("metrics - "+metrics);
        }
    }
}

现在,我尝试使用 10 条记录并查看指标是什么样的,我看到以下值但不确定原因。有人可以帮助我了解这里缺少什么吗?

records-consumed-total = 0
records-consumed-rate = 0

这对我来说很好;我使用的是 2.6.2,但容器在调用指标时只是委托给消费者。

@SpringBootApplication
public class So64878927Application {

    public static void main(String[] args) {
        SpringApplication.run(So64878927Application.class, args);
    }

    @Autowired
    KafkaListenerEndpointRegistry registry;

    @KafkaListener(id = "so64878927", topics = "so64878927")
    void listen(List<String> in) {
        System.out.println(in);
        Map<String, Map<MetricName, ? extends Metric>> metrics = registry.getListenerContainer("so64878927").metrics();
        System.out.println("L: " + metrics.get("consumer-so64878927-1").entrySet().stream()
                .filter(entry -> entry.getKey().name().startsWith("records-consumed"))
                .map(entry -> entry.getValue().metricName().name() + " = " + entry.getValue().metricValue())
                .collect(Collectors.toList()));

        registry.getListenerContainer("so64878927").stop(() -> System.out.println("Stopped"));
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so64878927").build();
    }

    @EventListener
    void idleEvent(ListenerContainerIdleEvent event) {
        Map<String, Map<MetricName, ? extends Metric>> metrics = registry.getListenerContainer("so64878927").metrics();
        System.out.println("I: " + metrics.get("consumer-so64878927-1").entrySet().stream()
                .filter(entry -> entry.getKey().name().startsWith("records-consumed"))
                .map(entry -> entry.getValue().metricName().name() + " = " + entry.getValue().metricValue())
                .collect(Collectors.toList()));
    }

}
spring.kafka.listener.type=batch
spring.kafka.listener.idle-event-interval=6000
[foo, bar, baz, foo, bar, baz]
L: [records-consumed-total = 6.0, records-consumed-rate = 0.1996472897880411, records-consumed-total = 6.0, records-consumed-rate = 0.1996539331824837]

我不确定为什么指标会重复,但正如我所说,我们所做的就是调用消费者的指标方法。

顺便说一句,如果你想从侦听器中停止容器,你应该使用异步停止 - 请参阅我的示例。