在批处理消费者中使用@KafkaListener 处理错误提交
Handling commits for errors with @KafkaListener in batch consumers
我们有如下所示的 Kafka 消费者设置
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> batchFactory(
final ConsumerFactory<String, Object> consumerFactory,
@Value("${someProp.batch}") final boolean enableBatchListener,
@Value("${someProp.concurrency}") final int consumerConcurrency,
@Value("${someProp.error.backoff.ms}") final int errorBackoffInterval
) {
final SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
errorHandler.setBackOff(new FixedBackOff(errorBackoffInterval, UNLIMITED_ATTEMPTS));
final var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
containerFactory.setConsumerFactory(consumerFactory);
containerFactory.getContainerProperties().setAckMode(MANUAL_IMMEDIATE);
containerFactory.getContainerProperties().setMissingTopicsFatal(false);
containerFactory.setBatchListener(enableBatchListener);
containerFactory.setConcurrency(consumerConcurrency);
containerFactory.setBatchErrorHandler(errorHandler);
return containerFactory;
}
someProp:
concurrency: 16
batch: true
error.backoff.ms: 2000
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
groupId: some-grp
autoOffsetReset: earliest
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL}
specific.avro.reader: true
security.protocol: SSL
在用@KafkaListener
注释的批处理监听器方法中,我们在列表处理结束时调用acknowledgment.acknowledge()
。假设当服务启动时,我已经在主题中有一百万条消息可供服务使用,我对这种情况有以下问题,因为我找不到详细讨论批量监听的文档:
- 监听器将读取列表中的 500 条消息。 500,因为
max.poll.records
未设置,因此默认为 500,因此列表将包含 500 条消息。这个理解对吗?
- 鉴于以上情况,消费者并发性从何而来?所述配置是否意味着我将有 16 个消费者,每个消费者可以从同一主题并行读取 500 条消息?
- 我明白,在这种情况下,我必须至少有 16 个分区才能使用所有消费者,否则我会剩下什么都不做的消费者?
- 由于
SeekToCurrentBatchErrorHandler
,如果侦听器方法内部处理出现任何异常,将重放该批处理。因此,如果在特定批次中处理第 50 条消息时出现异常,将再次播放前 49 条消息(基本上是重复的,我没意见),接下来的 50 到 500 条消息将被播放并尝试照常处理。这个理解对吗?
- 如果连续读取多个批次并且特定的消费者线程卡在
SeekToCurrentBatchErrorHandler
,如何处理偏移量提交,因为其他消费者线程仍会成功处理消息从而移动偏移量指针前进然后卡住的消费者偏移
MANUAL_IMMEDIATE
的文档指出
/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
*/
MANUAL_IMMEDIATE,
这是否意味着调用 acknowledgment.acknowledge()
是不够的,必须以某种方式使用 AcknowledgingMessageListener
?如果是,首选方法是什么。
您将获得“最多”500;不能保证您会得到恰好 500 个。
是; 16 个消费者(假设您至少有 16 个分区)。
正确。
正确;但是 2.5 版现在有了 RecoveringBatchErrorHandler
,您可以抛出一个特殊的异常来告诉它在批处理中的哪个位置发生了错误;它将提交成功记录的偏移量并寻找剩余的记录。
消费者获得唯一分区,因此“卡住”的消费者不会影响其他消费者。
我不确定你在那里问什么;如果您正在调用 ack.acknowledge()
,那么您已经在使用 AcknowledgingMessageListener
(@KafkaListener
始终具有该功能;我们仅使用手动确认模式填充确认。
但是,您真的不需要为此用例使用手动确认;当监听器正常退出时,容器会自动提交偏移量;无需使您的代码不必要地复杂化。
我们有如下所示的 Kafka 消费者设置
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> batchFactory(
final ConsumerFactory<String, Object> consumerFactory,
@Value("${someProp.batch}") final boolean enableBatchListener,
@Value("${someProp.concurrency}") final int consumerConcurrency,
@Value("${someProp.error.backoff.ms}") final int errorBackoffInterval
) {
final SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
errorHandler.setBackOff(new FixedBackOff(errorBackoffInterval, UNLIMITED_ATTEMPTS));
final var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
containerFactory.setConsumerFactory(consumerFactory);
containerFactory.getContainerProperties().setAckMode(MANUAL_IMMEDIATE);
containerFactory.getContainerProperties().setMissingTopicsFatal(false);
containerFactory.setBatchListener(enableBatchListener);
containerFactory.setConcurrency(consumerConcurrency);
containerFactory.setBatchErrorHandler(errorHandler);
return containerFactory;
}
someProp:
concurrency: 16
batch: true
error.backoff.ms: 2000
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
groupId: some-grp
autoOffsetReset: earliest
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL}
specific.avro.reader: true
security.protocol: SSL
在用@KafkaListener
注释的批处理监听器方法中,我们在列表处理结束时调用acknowledgment.acknowledge()
。假设当服务启动时,我已经在主题中有一百万条消息可供服务使用,我对这种情况有以下问题,因为我找不到详细讨论批量监听的文档:
- 监听器将读取列表中的 500 条消息。 500,因为
max.poll.records
未设置,因此默认为 500,因此列表将包含 500 条消息。这个理解对吗? - 鉴于以上情况,消费者并发性从何而来?所述配置是否意味着我将有 16 个消费者,每个消费者可以从同一主题并行读取 500 条消息?
- 我明白,在这种情况下,我必须至少有 16 个分区才能使用所有消费者,否则我会剩下什么都不做的消费者?
- 由于
SeekToCurrentBatchErrorHandler
,如果侦听器方法内部处理出现任何异常,将重放该批处理。因此,如果在特定批次中处理第 50 条消息时出现异常,将再次播放前 49 条消息(基本上是重复的,我没意见),接下来的 50 到 500 条消息将被播放并尝试照常处理。这个理解对吗? - 如果连续读取多个批次并且特定的消费者线程卡在
SeekToCurrentBatchErrorHandler
,如何处理偏移量提交,因为其他消费者线程仍会成功处理消息从而移动偏移量指针前进然后卡住的消费者偏移 MANUAL_IMMEDIATE
的文档指出
/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
*/
MANUAL_IMMEDIATE,
这是否意味着调用 acknowledgment.acknowledge()
是不够的,必须以某种方式使用 AcknowledgingMessageListener
?如果是,首选方法是什么。
您将获得“最多”500;不能保证您会得到恰好 500 个。
是; 16 个消费者(假设您至少有 16 个分区)。
正确。
正确;但是 2.5 版现在有了
RecoveringBatchErrorHandler
,您可以抛出一个特殊的异常来告诉它在批处理中的哪个位置发生了错误;它将提交成功记录的偏移量并寻找剩余的记录。消费者获得唯一分区,因此“卡住”的消费者不会影响其他消费者。
我不确定你在那里问什么;如果您正在调用
ack.acknowledge()
,那么您已经在使用AcknowledgingMessageListener
(@KafkaListener
始终具有该功能;我们仅使用手动确认模式填充确认。
但是,您真的不需要为此用例使用手动确认;当监听器正常退出时,容器会自动提交偏移量;无需使您的代码不必要地复杂化。