Spring Kafka 消费者重试,退避间隔时间长 "org.apache.kafka.clients.consumer.CommitFailedException"
Spring Kafka Consumer Retry with backoff interval of long duration giving "org.apache.kafka.clients.consumer.CommitFailedException"
我是 Spring-Kafka 的新手,正在尝试使用 Spring Kafka RetryTemplate 在发生故障或发生任何异常时重试 Kafka 消息。
我使用了以下代码:
//这是KafkaListenerContainerFactory:
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(retryContext -> {
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
logger.info("Recovery is called for message {} ", consumerRecord.value());
return Optional.empty();
});
return factory;
}
// 重试模板
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
// Todo: take from config
fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
// Todo: take from config
simpleRetryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
return retryTemplate;
}
//
这是消费者工厂
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
当发生任何异常时,都会按照重试策略按预期重试。 max重试一次,调用recovery回调方法。
但不久之后,它给出 "java.lang.IllegalStateException: This error handler cannot process
'org.apache.kafka.clients.consumer.CommitFailedException's;没有可用的记录信息,其中包含一些详细信息,例如:
OffsetCommit 请求失败,因为消费者不属于活动组。
似乎无法提交偏移量,因为消费者现在已被踢出组,因为
它在下一次轮询之前长时间闲置 (backoffperiod*(maxretry-1))。
我需要添加一些大值的 max.poll.interval.ms 吗?
是否有任何其他方法可以实现此目的,即使消费者在处理过程中花费了如此多的时间并计划以较长的间隔重试,也不会出现此提交失败错误。
请帮我解决这个问题。
总退避延迟必须小于 max.poll.interval.ms
以避免重新平衡。
现在首选使用 SeekToCurrentErrorHandler
而不是 RetryTemplate
因为这样只有每个延迟(而不是总延迟)需要小于 max.poll.interval.ms
我是 Spring-Kafka 的新手,正在尝试使用 Spring Kafka RetryTemplate 在发生故障或发生任何异常时重试 Kafka 消息。
我使用了以下代码:
//这是KafkaListenerContainerFactory:
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(retryContext -> {
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
logger.info("Recovery is called for message {} ", consumerRecord.value());
return Optional.empty();
});
return factory;
}
// 重试模板
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
// Todo: take from config
fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
// Todo: take from config
simpleRetryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
return retryTemplate;
}
//
这是消费者工厂
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
当发生任何异常时,都会按照重试策略按预期重试。 max重试一次,调用recovery回调方法。 但不久之后,它给出 "java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's;没有可用的记录信息,其中包含一些详细信息,例如: OffsetCommit 请求失败,因为消费者不属于活动组。
似乎无法提交偏移量,因为消费者现在已被踢出组,因为 它在下一次轮询之前长时间闲置 (backoffperiod*(maxretry-1))。
我需要添加一些大值的 max.poll.interval.ms 吗?
是否有任何其他方法可以实现此目的,即使消费者在处理过程中花费了如此多的时间并计划以较长的间隔重试,也不会出现此提交失败错误。
请帮我解决这个问题。
总退避延迟必须小于 max.poll.interval.ms
以避免重新平衡。
现在首选使用 SeekToCurrentErrorHandler
而不是 RetryTemplate
因为这样只有每个延迟(而不是总延迟)需要小于 max.poll.interval.ms