Spring Kafka 批量消费者的非阻塞重试
Non-blocking retries with Spring Kafka batch consumer
我正在使用 spring-kafka 2.8.0,我正在尝试实施 non-blocking retries for batch kafka consumer。这是我的配置和消费者:
@Configuration
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>>
batchListenerFactory(ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}
}
@Component
public class MyConsumer {
@KafkaListener(
topics = "my-topic",
containerFactory = "batchListenerFactory"
)
@RetryableTopic(
backoff = @Backoff(delay = 1000, multiplier = 2.0),
attempts = "4",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE,
autoCreateTopics = "false"
)
public void consume(List<ConsumerRecord<String, GenericRecord>> messages) {
// do some stuff
}
}
但是在 sturtup 上我遇到了以下异常:
java.lang.IllegalArgumentException: The provided class BatchMessagingMessageListenerAdapter is not assignable from AcknowledgingConsumerAwareMessageListener
我的问题是:
有什么方法可以将批量消费者与 @RetryableTopic
结合起来吗?
有没有其他方法实现批量消费者的非阻塞重试?是否可以将 RetryTemplate
用于此目的?
@RetryableTopic
不支持批量侦听器。
RecoveringBatchErrorHandler
(DefaultErrorHandler
for 2.8 and later)支持将批处理中的失败记录发送到死信主题,在监听器的帮助下抛出一个BatchListenerFailedException
指示哪条记录失败了。
然后您必须针对该主题实施自己的侦听器。
我正在使用 spring-kafka 2.8.0,我正在尝试实施 non-blocking retries for batch kafka consumer。这是我的配置和消费者:
@Configuration
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>>
batchListenerFactory(ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}
}
@Component
public class MyConsumer {
@KafkaListener(
topics = "my-topic",
containerFactory = "batchListenerFactory"
)
@RetryableTopic(
backoff = @Backoff(delay = 1000, multiplier = 2.0),
attempts = "4",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE,
autoCreateTopics = "false"
)
public void consume(List<ConsumerRecord<String, GenericRecord>> messages) {
// do some stuff
}
}
但是在 sturtup 上我遇到了以下异常:
java.lang.IllegalArgumentException: The provided class BatchMessagingMessageListenerAdapter is not assignable from AcknowledgingConsumerAwareMessageListener
我的问题是:
有什么方法可以将批量消费者与
@RetryableTopic
结合起来吗?有没有其他方法实现批量消费者的非阻塞重试?是否可以将
RetryTemplate
用于此目的?
@RetryableTopic
不支持批量侦听器。
RecoveringBatchErrorHandler
(DefaultErrorHandler
for 2.8 and later)支持将批处理中的失败记录发送到死信主题,在监听器的帮助下抛出一个BatchListenerFailedException
指示哪条记录失败了。
然后您必须针对该主题实施自己的侦听器。