Kafka Consumer 中如何处理错误
How to handle errors in Kafka Consumer
我有以下 Kafka 配置 class:
@Configuration
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class KafkaConfiguration {
private final KafkaConfigurationProperties kafkaConfigurationProperties;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> debtCollectorConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfiguration()));
factory.setConcurrency(kafkaConfigurationProperties.getDebtCollectorConsumerThreads());
factory.setStatefulRetry(true);
factory.setErrorHandler(new SeekToCurrentErrorHandler((record, exception) -> {
if (exception instanceof SomeCustomException) {
// here I want to mannually Acknowledge the consuming of the record
}
}, 10));
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
@Bean
@Qualifier(KAFKA_LOAN_REPAYMENT_PRODUCER)
public Producer<String, RepaymentEvent> loanRepaymentProducer() {
return new KafkaProducer<>(producerConfiguration());
}
@Bean
@Qualifier(KAFKA_DEBT_COLLECTOR_PRODUCER)
public Producer<String, RepaymentEvent> debtCollectorProducer() {
return new KafkaProducer<>(producerConfiguration());
}
private Map<String, Object> consumerConfiguration() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerGroupId());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerMaxPollRecords());
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, Boolean.TRUE);
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfigurationProperties.getConfluentEndpoint());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
return properties;
}
private Map<String, Object> producerConfiguration() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfigurationProperties.getConfluentEndpoint());
return properties;
}
}
和以下 KafkaListener:
@Slf4j
@Component
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class DebtCollectorIncomingClient {
private final RepaymentTransferProcessService repaymentTransferProcessService;
@KafkaListener(
topics = "${kafka.debtCollectorIncomingTopic}",
groupId = "${kafka.debtCollectorConsumerAutoOffsetReset}",
containerFactory = "debtCollectorConsumerContainerFactory")
public void submitMoneyTransferCommand(@Payload RepaymentEvent repaymentEvent) {
log.info("Receiving command: {}", repaymentEvent);
if (repaymentEvent.getPayload() instanceof RepaymentRequestTransfer) {
RepaymentTransfer repaymentTransfer = aRepaymentTransfer(repaymentEvent);
repaymentTransferProcessService.startRepaymentTransferProcess(repaymentTransfer);
}
}
private RepaymentTransfer aRepaymentTransfer(RepaymentEvent repaymentEvent) {
RepaymentRequestTransfer repaymentRequestTransfer = (RepaymentRequestTransfer) repaymentEvent.getPayload();
return RepaymentTransfer.builder()
.clientId(repaymentRequestTransfer.getClientId())
.contractId(repaymentRequestTransfer.getContractId())
.amount(BigDecimal.valueOf(repaymentRequestTransfer.getAmount()))
.currency(Currency.getInstance(repaymentRequestTransfer.getCurrency().name()))
.debtCollectorExternalId(repaymentEvent.getCorrelationId())
.debtType(repaymentRequestTransfer.getDebtType())
.build();
}
}
我想使用 SeekToCurrentErrorHandler
进行错误处理,我想要一些特定的东西,例如 here,但目前我正在使用 springBootVersion=2.0.4.RELEASE
、springKafkaVersion=2.1.4.RELEASE
、kafkaVersion=2.0.1
和 confluentVersion=3.3.1
。你能帮我设置依赖项和配置以处理 Kafka 消费者中的错误吗?
此致!
SeekToCurrentErrorHandler
从版本 2.0.1 开始可用。在 2.2 版中添加了附加功能(在一定次数的重试后恢复)。
使用 Spring Boot 2.1.4,Spring 用于 Apache Kafka 2.2.6(Boot 2.1.5 即将推出)。
几天后,阅读了 Gary 在其他一些帖子中的回答,我终于找到了解决问题的方法。也许这个问题不是很描述性,但这个答案描述了我想要的行为。
在 @Configuration
我正在创建以下 Spring bean:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> debtCollectorConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfiguration()));
factory.setConcurrency(kafkaConfigurationProperties.getDebtCollectorConsumerThreads());
factory.setErrorHandler(new BlockingSeekToCurrentErrorHandler());
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(kafkaConfigurationProperties.getDebtCollectorConsumerRetryAttempts()));
return retryTemplate;
}
和BlockingSeekToCurrentErrorHandler
class:
public class BlockingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private static final int MAX_RETRY_ATTEMPTS = Integer.MAX_VALUE;
BlockingSeekToCurrentErrorHandler() {
super(MAX_RETRY_ATTEMPTS);
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
try {
if (!records.isEmpty()) {
log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
MetricFactory.handleDebtCollectorIncomingBlockingError(records.get(0), exception);
super.handle(exception, records, consumer, container);
}
} catch (SerializationException e) {
log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
MetricFactory.handleDebtCollectorIncomingDeserializationError(records, e);
}
}
}
我有以下 Kafka 配置 class:
@Configuration
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class KafkaConfiguration {
private final KafkaConfigurationProperties kafkaConfigurationProperties;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> debtCollectorConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfiguration()));
factory.setConcurrency(kafkaConfigurationProperties.getDebtCollectorConsumerThreads());
factory.setStatefulRetry(true);
factory.setErrorHandler(new SeekToCurrentErrorHandler((record, exception) -> {
if (exception instanceof SomeCustomException) {
// here I want to mannually Acknowledge the consuming of the record
}
}, 10));
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
@Bean
@Qualifier(KAFKA_LOAN_REPAYMENT_PRODUCER)
public Producer<String, RepaymentEvent> loanRepaymentProducer() {
return new KafkaProducer<>(producerConfiguration());
}
@Bean
@Qualifier(KAFKA_DEBT_COLLECTOR_PRODUCER)
public Producer<String, RepaymentEvent> debtCollectorProducer() {
return new KafkaProducer<>(producerConfiguration());
}
private Map<String, Object> consumerConfiguration() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerGroupId());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerMaxPollRecords());
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, Boolean.TRUE);
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfigurationProperties.getConfluentEndpoint());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
return properties;
}
private Map<String, Object> producerConfiguration() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfigurationProperties.getConfluentEndpoint());
return properties;
}
}
和以下 KafkaListener:
@Slf4j
@Component
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class DebtCollectorIncomingClient {
private final RepaymentTransferProcessService repaymentTransferProcessService;
@KafkaListener(
topics = "${kafka.debtCollectorIncomingTopic}",
groupId = "${kafka.debtCollectorConsumerAutoOffsetReset}",
containerFactory = "debtCollectorConsumerContainerFactory")
public void submitMoneyTransferCommand(@Payload RepaymentEvent repaymentEvent) {
log.info("Receiving command: {}", repaymentEvent);
if (repaymentEvent.getPayload() instanceof RepaymentRequestTransfer) {
RepaymentTransfer repaymentTransfer = aRepaymentTransfer(repaymentEvent);
repaymentTransferProcessService.startRepaymentTransferProcess(repaymentTransfer);
}
}
private RepaymentTransfer aRepaymentTransfer(RepaymentEvent repaymentEvent) {
RepaymentRequestTransfer repaymentRequestTransfer = (RepaymentRequestTransfer) repaymentEvent.getPayload();
return RepaymentTransfer.builder()
.clientId(repaymentRequestTransfer.getClientId())
.contractId(repaymentRequestTransfer.getContractId())
.amount(BigDecimal.valueOf(repaymentRequestTransfer.getAmount()))
.currency(Currency.getInstance(repaymentRequestTransfer.getCurrency().name()))
.debtCollectorExternalId(repaymentEvent.getCorrelationId())
.debtType(repaymentRequestTransfer.getDebtType())
.build();
}
}
我想使用 SeekToCurrentErrorHandler
进行错误处理,我想要一些特定的东西,例如 here,但目前我正在使用 springBootVersion=2.0.4.RELEASE
、springKafkaVersion=2.1.4.RELEASE
、kafkaVersion=2.0.1
和 confluentVersion=3.3.1
。你能帮我设置依赖项和配置以处理 Kafka 消费者中的错误吗?
此致!
SeekToCurrentErrorHandler
从版本 2.0.1 开始可用。在 2.2 版中添加了附加功能(在一定次数的重试后恢复)。
使用 Spring Boot 2.1.4,Spring 用于 Apache Kafka 2.2.6(Boot 2.1.5 即将推出)。
几天后,阅读了 Gary 在其他一些帖子中的回答,我终于找到了解决问题的方法。也许这个问题不是很描述性,但这个答案描述了我想要的行为。
在 @Configuration
我正在创建以下 Spring bean:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> debtCollectorConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfiguration()));
factory.setConcurrency(kafkaConfigurationProperties.getDebtCollectorConsumerThreads());
factory.setErrorHandler(new BlockingSeekToCurrentErrorHandler());
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(kafkaConfigurationProperties.getDebtCollectorConsumerRetryAttempts()));
return retryTemplate;
}
和BlockingSeekToCurrentErrorHandler
class:
public class BlockingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private static final int MAX_RETRY_ATTEMPTS = Integer.MAX_VALUE;
BlockingSeekToCurrentErrorHandler() {
super(MAX_RETRY_ATTEMPTS);
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
try {
if (!records.isEmpty()) {
log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
MetricFactory.handleDebtCollectorIncomingBlockingError(records.get(0), exception);
super.handle(exception, records, consumer, container);
}
} catch (SerializationException e) {
log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
MetricFactory.handleDebtCollectorIncomingDeserializationError(records, e);
}
}
}