如何用SeekToCurrentErrorHandler实现ErrorHandlingDeserializer并在自定义topic上发布错误记录
How to implement ErrorHandlingDeserializer with SeekToCurrentErrorHandler and publish error records on custom topic
我正在尝试在 spring-kafka 中编写 Kafka 消费者应用程序。我可以想到 2 种可能发生错误的情况:
- 处理记录时,服务层可能发生异常(在table中通过API更新记录)
- 反序列化错误
我已经探索了处理 场景 1 的选项,我可以在我的代码中抛出异常并使用 SeekToCurrentErrorHandler
.
处理它
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于场景 2,我有一个选项 ErrorHandlingDeserializer
,但我不确定如何使用 SeekToCurrentErrorHandler
来实现它。有没有办法使用 SeekToCurrentErrorHandler 为这两种情况包含 ErrorHandler。
我的属性class如下:
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
发布错误记录:
我也在考虑将错误记录发布到死信队列。对于场景 1,它应该重试并在死信队列上发布,对于场景 2,它应该直接发布,因为重试没有任何好处。我可能无法自己创建一个主题,并且需要让我的生产者也为错误记录创建一个主题。如何实现逻辑以在自定义错误主题上发布记录。
因为如果我使用 DeadLetterPublishingRecoverer
,我无法控制名称。根据我的理解,它使用 .DLT.
创建主题
SeekToCurrentErrorHandler
将某些异常(例如 DeserializationException
)视为致命异常并且不会重试 - 失败的记录会立即发送给恢复器。
对于可重试异常,在重试次数耗尽后调用恢复程序。
/**
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionTypes the exception types.
* @see #removeNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
*/
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
Based on my understanding, it creates topic with <original_topic_name>.DLT.
这是默认行为;您可以提供自己的 DLT 主题名称策略(目标解析器)。
The following example shows how to wire a custom destination resolver:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
我正在尝试在 spring-kafka 中编写 Kafka 消费者应用程序。我可以想到 2 种可能发生错误的情况:
- 处理记录时,服务层可能发生异常(在table中通过API更新记录)
- 反序列化错误
我已经探索了处理 场景 1 的选项,我可以在我的代码中抛出异常并使用 SeekToCurrentErrorHandler
.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于场景 2,我有一个选项 ErrorHandlingDeserializer
,但我不确定如何使用 SeekToCurrentErrorHandler
来实现它。有没有办法使用 SeekToCurrentErrorHandler 为这两种情况包含 ErrorHandler。
我的属性class如下:
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
发布错误记录:
我也在考虑将错误记录发布到死信队列。对于场景 1,它应该重试并在死信队列上发布,对于场景 2,它应该直接发布,因为重试没有任何好处。我可能无法自己创建一个主题,并且需要让我的生产者也为错误记录创建一个主题。如何实现逻辑以在自定义错误主题上发布记录。
因为如果我使用 DeadLetterPublishingRecoverer
,我无法控制名称。根据我的理解,它使用
SeekToCurrentErrorHandler
将某些异常(例如 DeserializationException
)视为致命异常并且不会重试 - 失败的记录会立即发送给恢复器。
对于可重试异常,在重试次数耗尽后调用恢复程序。
/**
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionTypes the exception types.
* @see #removeNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
*/
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
Based on my understanding, it creates topic with <original_topic_name>.DLT.
这是默认行为;您可以提供自己的 DLT 主题名称策略(目标解析器)。
The following example shows how to wire a custom destination resolver:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));