Spring Kafka Retry 是否在不同分区之间挑选消息?

Does Spring Kafka Retry pick messages amongst different partitions?

我想知道 Spring Kafka 如何处理分配给一个实例的多个分区的重试。 Spring Kafka 是根据重试策略和退避策略不断重试相同的消息,还是重试,在重试之间,是否从其他分区发送消息?

是行为:

A)重试消息->重试消息->重试消息

B)重试消息->其他消息->重试消息->重试消息

我查看了其他 Whosebug 问题,这些问题似乎证实了给定一个分区 Spring Kafka 不会移动到另一个偏移量,但是没有关于如果分配了多个分区会有什么行为的信息到实例。我已经实现了一个具有重试模板和有状态重试的工厂。

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        ListenerExceptions listenerExceptions = new ListenerExceptions();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(KafkaProperties.CONCURRENCY);
        factory.getContainerProperties().setPollTimeout(KafkaProperties.POLL_TIMEOUT_VLAUE);
        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(new SeekToCurrentErrorHandler());
        factory.setStatefulRetry(true);
        factory.setRecoveryCallback((RetryContext context) -> listenerExceptions.recover(context));
        return factory;
    }

上述工厂的重试配置委托给RetryingMessageListenerAdapter,逻辑是这样的:

public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment,
        final Consumer<?, ?> consumer) {
    RetryState retryState = null;
    if (this.stateful) {
        retryState = new DefaultRetryState(record.topic() + "-" + record.partition() + "-" + record.offset());
    }
    getRetryTemplate().execute(context -> {
                context.setAttribute(CONTEXT_RECORD, record);
                switch (RetryingMessageListenerAdapter.this.delegateType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE:
                        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
                        context.setAttribute(CONTEXT_CONSUMER, consumer);
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
                        break;
                    case ACKNOWLEDGING:
                        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
                        break;
                    case CONSUMER_AWARE:
                        context.setAttribute(CONTEXT_CONSUMER, consumer);
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
                        break;
                    case SIMPLE:
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record);
                }
                return null;
            },
            getRecoveryCallback(), retryState);
}

因此,我们会重试每条消息。根据 Apache Kafka 的建议,我们在一个线程中处理一个分区,因此在重试耗尽或调用成功之前,不会处理该分区中的每条下一条记录。

根据您的多分区情况和factory.setConcurrency(KafkaProperties.CONCURRENCY);配置,可能是不同的分区在不同的线程中处理。因此,可能会同时重试来自不同分区的不同记录。只是因为重试绑定到线程和调用堆栈。