如何用SeekToCurrentErrorHandler实现ErrorHandlingDeserializer并在自定义topic上发布错误记录

How to implement ErrorHandlingDeserializer with SeekToCurrentErrorHandler and publish error records on custom topic

我正在尝试在 spring-kafka 中编写 Kafka 消费者应用程序。我可以想到 2 种可能发生错误的情况:

我已经探索了处理 场景 1 的选项,我可以在我的代码中抛出异常并使用 SeekToCurrentErrorHandler.

处理它
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于场景 2,我有一个选项 ErrorHandlingDeserializer,但我不确定如何使用 SeekToCurrentErrorHandler 来实现它。有没有办法使用 SeekToCurrentErrorHandler 为这两种情况包含 ErrorHandler。

我的属性class如下:

@Bean
public ConsumerFactory<String, String> consumerFactory(){
    
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
    props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
    props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
    
    return new DefaultKafkaConsumerFactory<>(props);
}

发布错误记录:

我也在考虑将错误记录发布到死信队列。对于场景 1,它应该重试并在死信队列上发布,对于场景 2,它应该直接发布,因为重试没有任何好处。我可能无法自己创建一个主题,并且需要让我的生产者也为错误记录创建一个主题。如何实现逻辑以在自定义错误主题上发布记录。 因为如果我使用 DeadLetterPublishingRecoverer,我无法控制名称。根据我的理解,它使用 .DLT.

创建主题

SeekToCurrentErrorHandler 将某些异常(例如 DeserializationException)视为致命异常并且不会重试 - 失败的记录会立即发送给恢复器。

对于可重试异常,在重试次数耗尽后调用恢复程序。

/**
 * Add exception types to the default list. By default, the following exceptions will
 * not be retried:
 * <ul>
 * <li>{@link DeserializationException}</li>
 * <li>{@link MessageConversionException}</li>
 * <li>{@link ConversionException}</li>
 * <li>{@link MethodArgumentResolutionException}</li>
 * <li>{@link NoSuchMethodException}</li>
 * <li>{@link ClassCastException}</li>
 * </ul>
 * All others will be retried.
 * @param exceptionTypes the exception types.
 * @see #removeNotRetryableException(Class)
 * @see #setClassifications(Map, boolean)
 */
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {

Based on my understanding, it creates topic with <original_topic_name>.DLT.

这是默认行为;您可以提供自己的 DLT 主题名称策略(目标解析器)。

the documentation

The following example shows how to wire a custom destination resolver:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));