Kafka - CommitFailedException - 没有超时轮询(max.poll.interval.ms)
Kafka - CommitFailedException - No timeout poll(max.poll.interval.ms)
我有一个 Spring Kafka 应用程序,当消费者尝试提交偏移量时出现错误。这是我的 Kafka 消费者配置:
卡夫卡消费者:
@Bean
public ConsumerFactory<String, Item> consumerFactory() {
log.info("Configuring Kafka Consumer properties - consumerFactory");
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cpo-executor-groupid");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(Item.class, false));
}
卡夫卡管理员:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic processTopic() {
return TopicBuilder.name(topicName).partitions(2).build();
}
我知道如果进程从 kafka 获取超过 max.poll.interval.ms 或 session.timeout.ms 会发生这种情况,但我的情况不是这样。我的应用程序使用和处理消息的时间不到 1 秒:
Time: 11:00:32.773 Configuring Kafka Consumer properties - consumerFactory
Time: 11:00:39.433 INFO [cpo-executor,,] 55630 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig
Time: 11:00:57.293 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator:
[Consumer clientId=consumer-cpo-executor-groupid-1, groupId=cpo-executor-groupid] Offset commit failed on partition process-topic-1 at offset 95:
The coordinator is not aware of this member.
Time: 11:00:57.299 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] c.c.cpoexecutor.config.KafkaErrHandler : Error in process with Exception org.apache.kafka.clients.consumer.CommitFailedException:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. and the records are []
因为我没有更改任何 Kafka 配置并且 max.poll.interval.ms 的默认值为 5 分钟,所以发生了什么?
kakfa: 2.13-2.8.0
spring-卡夫卡:2.7.6
spring: 2.4.2
我已经根据环境更改了groupId名称,所以不会在同一个kafka上有相同的groupId。
卡夫卡消费者:
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offSetReset;
@Bean
public ConsumerFactory<String, Item> consumerFactory() {
log.info("Configuring Kafka Consumer properties - consumerFactory");
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offSetReset);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
申请-local.yml:
spring:
kafka:
bootstrap-servers: #####:9092
consumer:
auto-offset-reset: earliest
group-id: cpo-executor-groupid-local
申请-dev.yml:
spring:
kafka:
bootstrap-servers: #####:9092
consumer:
auto-offset-reset: earliest
group-id: cpo-executor-groupid-dev
申请-prd.yml
spring:
kafka:
consumer:
bootstrap-servers: #####:9092
auto-offset-reset: earliest
group-id: cpo-executor-groupid-prd
我有一个 Spring Kafka 应用程序,当消费者尝试提交偏移量时出现错误。这是我的 Kafka 消费者配置:
卡夫卡消费者:
@Bean
public ConsumerFactory<String, Item> consumerFactory() {
log.info("Configuring Kafka Consumer properties - consumerFactory");
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cpo-executor-groupid");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(Item.class, false));
}
卡夫卡管理员:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic processTopic() {
return TopicBuilder.name(topicName).partitions(2).build();
}
我知道如果进程从 kafka 获取超过 max.poll.interval.ms 或 session.timeout.ms 会发生这种情况,但我的情况不是这样。我的应用程序使用和处理消息的时间不到 1 秒:
Time: 11:00:32.773 Configuring Kafka Consumer properties - consumerFactory
Time: 11:00:39.433 INFO [cpo-executor,,] 55630 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig
Time: 11:00:57.293 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator:
[Consumer clientId=consumer-cpo-executor-groupid-1, groupId=cpo-executor-groupid] Offset commit failed on partition process-topic-1 at offset 95:
The coordinator is not aware of this member.
Time: 11:00:57.299 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] c.c.cpoexecutor.config.KafkaErrHandler : Error in process with Exception org.apache.kafka.clients.consumer.CommitFailedException:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. and the records are []
因为我没有更改任何 Kafka 配置并且 max.poll.interval.ms 的默认值为 5 分钟,所以发生了什么?
kakfa: 2.13-2.8.0
spring-卡夫卡:2.7.6
spring: 2.4.2
我已经根据环境更改了groupId名称,所以不会在同一个kafka上有相同的groupId。
卡夫卡消费者:
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offSetReset;
@Bean
public ConsumerFactory<String, Item> consumerFactory() {
log.info("Configuring Kafka Consumer properties - consumerFactory");
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offSetReset);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
申请-local.yml:
spring:
kafka:
bootstrap-servers: #####:9092
consumer:
auto-offset-reset: earliest
group-id: cpo-executor-groupid-local
申请-dev.yml:
spring:
kafka:
bootstrap-servers: #####:9092
consumer:
auto-offset-reset: earliest
group-id: cpo-executor-groupid-dev
申请-prd.yml
spring:
kafka:
consumer:
bootstrap-servers: #####:9092
auto-offset-reset: earliest
group-id: cpo-executor-groupid-prd