在 Spring-Kafka 中,哪个方法与 onPartitionsRevoked 做同样的事情?

In Spring-Kafka which method does the same thing like onPartitionsRevoked?

我知道在 Sping-Kafka 中我们有以下方法:

void registerSeekCallback(ConsumerSeekCallback 回调);

void onPartitionsAssigned(地图分配,ConsumerSeekCallback 回调);

void onIdleContainer(地图分配,ConsumerSeekCallback 回调);

但是它和原生的 ConsumerRebalanceListener 方法 onPartitionsRevoked 做同样的事情的是哪一个?

"This method will be called before a rebalance operation starts and after the consumer stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a custom offset store to prevent duplicate data."

如果我想实现ConsumerRebalanceListener,如何传递KafkaConsumer引用?我只看到来自 Spring-Kafka 的消费者。

=========更新======

嗨,加里,当我将 RebalanceListener 添加到 ContainerProperties 时。我可以看到两种方法都被触发了。然而,我得到了例外,说的是 "Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing" 你有什么想法吗?

===========更新2 ============

    public ConcurrentMessageListenerContainer<Integer, String> createContainer(
      ContainerProperties containerProps, IKafkaConsumer iKafkaConsumer) {

    Map<String, Object> props = consumerProps();

    DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);

    **RebalanceListner rebalanceListner = new RebalanceListner(cf.createConsumer());**

    CustomKafkaMessageListener ckml = new CustomKafkaMessageListener(iKafkaConsumer, rebalanceListner);

    CustomRecordFilter cff = new CustomRecordFilter();

    FilteringAcknowledgingMessageListenerAdapter faml = new FilteringAcknowledgingMessageListenerAdapter(ckml, cff, true);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(5);

    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(1500); // 1.5 seconds

    RetryTemplate rt = new RetryTemplate();
    rt.setBackOffPolicy(backOffPolicy);
    rt.setRetryPolicy(retryPolicy);
    rt.registerListener(ckml);
    RetryingAcknowledgingMessageListenerAdapter rml = new RetryingAcknowledgingMessageListenerAdapter(faml, rt);

    containerProps.setConsumerRebalanceListener(rebalanceListner);
    containerProps.setMessageListener(rml);
    containerProps.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    containerProps.setErrorHandler(ckml);
    containerProps.setAckOnError(false);
    ConcurrentMessageListenerContainer<Integer, String> container = new ConcurrentMessageListenerContainer<>(
        cf, containerProps);

    container.setConcurrency(1);
    return container;
  }

您可以将 RebalanceListener 添加到传递给构造函数的容器的 ContainerProperties