处理长处理记录时消费者配置问题 'session.timeout.ms'

Issue with consumer config 'session.timeout.ms' while processing long processing records

我正在使用 spring-kafka '2.1.7.RELEASE',这是我的消费者设置。

    public Map<String, Object> setConsumerConfigs() {

           Map<String, Object> configs = = new HashMap<>();

           configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

           configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
           configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);

           configs.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, stringDeserializerClass);
           configs.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, kafkaAvroDeserializerClass.getName());

           configs.setPartitionAssignmentStrategyConfig(Collections.singletonList(RoundRobinAssignor.class));

           // Set this to true so that you will have consumer record value coming as your pre-defined contract instead of a generic record
           sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
       }

这是我的出厂设置

        @Bean
         public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getConsumerConfigs));
           factory.getContainerProperties().setMissingTopicsFatal(false);

           factory.getContainerProperties().setAckMode(AckMode.RECORD);

           factory.setErrorHandler(myCustomKafkaSeekToCurrentErrorHandler);
           factory.setRetryTemplate(retryTemplate());
           factory.setRecoveryCallback(myCustomKafkaRecoveryCallback);
           factory.setStatefulRetry(true);
           return factory;
         }

         public RetryTemplate retryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setListeners(new RetryListener[]{myCustomKafkaRetryListener});
            retryTemplate.setRetryPolicy(myCustomKafkaConsumerRetryPolicy);

            FixedBackOffPolicy backOff = new FixedBackOffPolicy();
            backOff.setBackOffPeriod(1000);
            retryTemplate.setBackOffPolicy(backOff);


            return retryTemplate;
          }

这是我的消费者,我在其中添加了 6 分钟的延迟,该延迟大于默认值 max.poll.interval.ms

@KafkaListener(topics = TestConsumerConstants.CONSUMER_LONGRUNNING_RECORDS_PROCESSSING_TEST_TOPIC
      , clientIdPrefix = "CONSUMER_LONGRUNNING_RECORDS_PROCESSSING"
      , groupId = "kafka-lib-comp-test-consumers")
  public void consumeLongRunningRecord(ConsumerRecord message) throws InterruptedException {
    System.out.println(String.format("\n \n Received message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));

    TimeUnit.MINUTES.sleep(6);

    System.out.println(String.format("\n \n Processing done for the message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));
  }

现在我遇到错误并尝试一次又一次地处理同一条记录,因为它无法提交偏移量(这是预期的)。

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

现在,我尝试设置 'session.timeout.ms' = 420000。现在我遇到了以下错误,但我没有为 group.min.session.timeout.ms 和 group.max.session.timeout.ms 设置任何值。 group.min.session.timeout.ms 和 group.max.session.timeout.ms 的默认值是 6000 和 1800000 respectively.So,谁能帮我理解为什么会出现这个错误?

Caused by: org.apache.kafka.common.errors.InvalidSessionTimeoutException: The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms). 

我不知道您为什么会收到该错误,但会话超时不再相关;参见 KIP-62。也许默认值已更改并且文档未更新。

您需要增加 max.poll.interval.ms 以避免再平衡。