Spring Kafka 消费者的 CommitFailedException

CommitFailedException by Spring Kafka Consumer

我在使用 Spring Kafka Consumer 时有时会收到以下错误消息。我至少实现了一次语义,如代码片段所示

1 )我怀疑我是否错过了消费的任何信息?

2)我需要处理这个错误吗?因为 seekToCurrentErrorHandler()

没有报告这个错误

org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

我的 spring kafka 消费者代码片段

  public class KafkaConsumerConfig implements KafkaListenerConfigurer 
    @Bean
        public SeekToCurrentErrorHandler seekToCurrentErrorHandler() {
            SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((record, e) -> {
                System.out.println("RECORD from topic " + record.topic() + " at partition " + record.partition()
                        + " at offset " + record.offset() + " did not process correctly due to a " + e.getCause());
            }, new FixedBackOff(500L, 3L));
            return seekToCurrentErrorHandler;
        }
    
        @Bean 
        public ConsumerFactory<String, ValidatedConsumerClass> consumerFactory() {
      
      ErrorHandlingDeserializer<ValidatedConsumerClass> errorHandlingDeserializer;
      errorHandlingDeserializer = new ErrorHandlingDeserializer<>( new JsonDeserializer<>(ValidatedConsumerClass.class));
      
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-098");
      props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
      
      
      return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
      errorHandlingDeserializer);
      
      }
    
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ValidatedConsumerClass>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setAckMode(AckMode.RECORD);
            factory.setErrorHandler(seekToCurrentErrorHandler());
            return factory;
        }

消费者正在阅读消息

@Service
public class KafKaConsumerService extends  AbstractConsumerSeekAware {

@KafkaListener(id = "foo", topics = "mytopic-5", concurrency = "5", groupId = "mytopic-1-groupid")
    public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c) {


        
    databaseService.save(message);
        
        System.out.println( "-- Consumer End -- "   + c.partition() + " ---consumer thread-- " + Thread.currentThread().getName());


    }
  1. 不,你没有遗漏任何东西。
  2. 不,您不需要处理它,STCEH 已经处理了它,记录将在下一次轮询时重新传送。

在这种情况下,异常是在记录处理之外(处理完成后)引起的。由于重新平衡导致提交失败,因此 STCEH 无需重新搜索(无论如何也不能,因为记录不再可用)。它只是重新抛出异常。

一切都按预期进行...

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.interval.ms=5000
@SpringBootApplication
public class So69016372Application {

    public static void main(String[] args) {
        SpringApplication.run(So69016372Application.class, args);
    }

    @KafkaListener(id = "so69016372", topics = "so69016372")
    public void listen(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
        System.out.println(in + " @" + offset);
        Thread.sleep(6000);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so69016372").partitions(1).replicas(1).build();
    }

}

结果

2021-09-01 13:47:26.963  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:31.991  INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-f02f8d74-c2b8-47d9-92d3-bf68e5c81a8f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:32.989  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:32.994 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 3 common frames omitted

2021-09-01 13:47:32.994  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:32.994  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:32.995  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:32.995  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:33.102  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:38.141  INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-e6ec685a-d9aa-43d3-b526-b04418095f09 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:39.108  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:39.109 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 3 common frames omitted

2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:39.217  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
foo @0

它将无限期重试。