自动提交偏移量失败并重试也无法正常工作
auto-commit of offsets Failed & Retry also not working as excepted
我正在使用 spring boot 2.1.9 和 spring Kafka 2.2.9
我在日志文件中收到一些警告,提示提交失败,而且我正在使用 SeekToCurrentErrorHandler 在重试耗尽后捕获错误,但有时如果提交失败,它会继续迭代。
这是我的配置class
@Configuration
@EnableKafka
public class KafkaReceiverConfig {
// Kafka Server Configuration
@Value("${kafka.servers}")
private String kafkaServers;
// Group Identifier
@Value("${kafka.groupId}")
private String groupId;
// Kafka Max Retry Attempts
@Value("${kafka.retry.maxAttempts:5}")
private Integer retryMaxAttempts;
// Kafka Max Retry Interval
@Value("${kafka.retry.interval:180000}")
private Long retryInterval;
// Kafka Concurrency
@Value("${kafka.concurrency:10}")
private Integer concurrency;
// Kafka Concurrency
@Value("${kafka.poll.timeout:100}")
private Integer pollTimeout;
// Kafka Consumer Offset
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String offset = "earliest";
// Logger
private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);
/**
* Defines the Max Number of Retry Attempts
*
* @return Return the Retry Policy @see {@link RetryPolicy}
*/
@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
return simpleRetryPolicy;
}
/**
* Time before the next Retry can happen, the Time used is in Milliseconds
*
* @return Return the BackOff Policy @see {@link BackOffPolicy}
*/
@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
return backOffPolicy;
}
/**
* Get Retry Template
*
* @return Return the Retry Template @see {@link RetryTemplate}
*/
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
/**
* String Kafka Listener Container Factor
*
* @return @see {@link KafkaListenerContainerFactory}
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
factory.setStatefulRetry(true);
// NOTE: retryMaxAttempts should always +1 due to spring kafka bug
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
log.warn("failed to process kafka message (retries are exausted). topic name:"+record.topic()+" value:"+record.value());
messageProducer.saveFailedMessage(record, exception);
}, retryMaxAttempts + 1);
factory.setErrorHandler(errorHandler);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}
/**
* String Consumer Factory
*
* @return @see {@link ConsumerFactory}
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
log.debug("Kafka Receiver Config consumerFactory created");
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* Consumer Configurations
*
* @return @see {@link Map}
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Disable the Auto Commit if required for testing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
log.debug("Kafka Receiver Config consumerConfigs created");
return props;
}
}
这是日志:
2019-10-30 15:48:05.907 WARN [xxxxx-component-workflow-starter,,,] 11 --- [nt_create-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=fulfillment_create] Synchronous auto-commit of offsets {fulfillment_create-4=OffsetAndMetadata{offset=32, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
- 我的配置文件有问题吗?
- 如何设置最大轮询和会话超时等等? (举个例子)
- 如何在 spring Kafka 2.2.9 中设置 SeekToCurrentErrorHandler 以使其正常工作(因为我无法升级 spring Kafka 由于一些其他依赖项)?
您处理 poll() 返回的记录所花费的时间太长。
您需要减少 max.poll.records
(ConsumerConfig.MAX_POLL_RECORDS_CONFIG
) and/or 增加 max.poll.interval.ms
.
出现此错误后您无法执行搜索 - 您已丢失分区。
我正在使用 spring boot 2.1.9 和 spring Kafka 2.2.9
我在日志文件中收到一些警告,提示提交失败,而且我正在使用 SeekToCurrentErrorHandler 在重试耗尽后捕获错误,但有时如果提交失败,它会继续迭代。
这是我的配置class
@Configuration
@EnableKafka
public class KafkaReceiverConfig {
// Kafka Server Configuration
@Value("${kafka.servers}")
private String kafkaServers;
// Group Identifier
@Value("${kafka.groupId}")
private String groupId;
// Kafka Max Retry Attempts
@Value("${kafka.retry.maxAttempts:5}")
private Integer retryMaxAttempts;
// Kafka Max Retry Interval
@Value("${kafka.retry.interval:180000}")
private Long retryInterval;
// Kafka Concurrency
@Value("${kafka.concurrency:10}")
private Integer concurrency;
// Kafka Concurrency
@Value("${kafka.poll.timeout:100}")
private Integer pollTimeout;
// Kafka Consumer Offset
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String offset = "earliest";
// Logger
private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);
/**
* Defines the Max Number of Retry Attempts
*
* @return Return the Retry Policy @see {@link RetryPolicy}
*/
@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
return simpleRetryPolicy;
}
/**
* Time before the next Retry can happen, the Time used is in Milliseconds
*
* @return Return the BackOff Policy @see {@link BackOffPolicy}
*/
@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
return backOffPolicy;
}
/**
* Get Retry Template
*
* @return Return the Retry Template @see {@link RetryTemplate}
*/
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
/**
* String Kafka Listener Container Factor
*
* @return @see {@link KafkaListenerContainerFactory}
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
factory.setStatefulRetry(true);
// NOTE: retryMaxAttempts should always +1 due to spring kafka bug
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
log.warn("failed to process kafka message (retries are exausted). topic name:"+record.topic()+" value:"+record.value());
messageProducer.saveFailedMessage(record, exception);
}, retryMaxAttempts + 1);
factory.setErrorHandler(errorHandler);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}
/**
* String Consumer Factory
*
* @return @see {@link ConsumerFactory}
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
log.debug("Kafka Receiver Config consumerFactory created");
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* Consumer Configurations
*
* @return @see {@link Map}
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Disable the Auto Commit if required for testing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
log.debug("Kafka Receiver Config consumerConfigs created");
return props;
}
}
这是日志:
2019-10-30 15:48:05.907 WARN [xxxxx-component-workflow-starter,,,] 11 --- [nt_create-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=fulfillment_create] Synchronous auto-commit of offsets {fulfillment_create-4=OffsetAndMetadata{offset=32, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
- 我的配置文件有问题吗?
- 如何设置最大轮询和会话超时等等? (举个例子)
- 如何在 spring Kafka 2.2.9 中设置 SeekToCurrentErrorHandler 以使其正常工作(因为我无法升级 spring Kafka 由于一些其他依赖项)?
您处理 poll() 返回的记录所花费的时间太长。
您需要减少 max.poll.records
(ConsumerConfig.MAX_POLL_RECORDS_CONFIG
) and/or 增加 max.poll.interval.ms
.
出现此错误后您无法执行搜索 - 您已丢失分区。