Spring Kafka 批量消费者的非阻塞重试

Non-blocking retries with Spring Kafka batch consumer

我正在使用 spring-kafka 2.8.0,我正在尝试实施 non-blocking retries for batch kafka consumer。这是我的配置和消费者:

@Configuration
public class KafkaConfig {
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> 
            batchListenerFactory(ConsumerFactory<Object, Object> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        return factory;
    }
}
@Component
public class MyConsumer {

    @KafkaListener(
            topics = "my-topic",
            containerFactory = "batchListenerFactory"
    )
    @RetryableTopic(
            backoff = @Backoff(delay = 1000, multiplier = 2.0),
            attempts = "4",
            topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE,
            autoCreateTopics = "false"
    )
    public void consume(List<ConsumerRecord<String, GenericRecord>> messages) {
        // do some stuff
    }

}

但是在 sturtup 上我遇到了以下异常: java.lang.IllegalArgumentException: The provided class BatchMessagingMessageListenerAdapter is not assignable from AcknowledgingConsumerAwareMessageListener

我的问题是:

  1. 有什么方法可以将批量消费者与 @RetryableTopic 结合起来吗?

  2. 有没有其他方法实现批量消费者的非阻塞重试?是否可以将 RetryTemplate 用于此目的?

@RetryableTopic 不支持批量侦听器。

RecoveringBatchErrorHandlerDefaultErrorHandler for 2.8 and later)支持将批处理中的失败记录发送到死信主题,在监听器的帮助下抛出一个BatchListenerFailedException指示哪条记录失败了。

然后您必须针对该主题实施自己的侦听器。