分别处理反序列化错误和其他异常

Handle deserialisation errors and other exceptions separately

使用来自 spring-cloud Hoxton.SR12 版本的 spring-cloud-stream 和 Kafka Binder。 引导版本:2.5.2

问题陈述:

到目前为止,这是我的错误处理代码:

@Configuration
@Slf4j
public class ErrorHandlingConfig {

    @Value("${errorHandling.parkingLotDestination}")
    private String parkingLotDestination;

    @Value("${errorHandling.retryAttempts}")
    private long retryAttempts;

    @Value("${errorHandling.retryIntervalMillis}")
    private long retryIntervalMillis;

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
        return (container, dest, group) -> {
            container.setErrorHandler(errorHandler);
        };
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer parkingLotPublisher) {
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(parkingLotPublisher, new FixedBackOff(retryIntervalMillis, retryAttempts));
        seekToCurrentErrorHandler.addNotRetryableExceptions(ValidationException.class); 
        return seekToCurrentErrorHandler;
    }

    @Bean
    public DeadLetterPublishingRecoverer parkingLotPublisher(KafkaOperations bytesTemplate) {
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(bytesTemplate, (cr, e) -> new TopicPartition(parkingLotDestination, cr.partition()));
        deadLetterPublishingRecoverer.setHeadersFunction((cr, e) -> cr.headers());

        return deadLetterPublishingRecoverer;
    }
}

我认为到目前为止我所拥有的应该涵盖被推送到停车场的可重试异常。我现在如何添加代码以将失败的反序列化事件推送到有毒主题?

由于无法发送到自定义 dlqName 的突出问题,我想在 binder/binding 配置之外和容器级别执行此操作。

我可以使用 ErrorHandlingDeserializer 并对其调用 setFailedDeserializationFunction(),它包含一个将消息发送到有毒主题的函数。我应该使用 Source 绑定还是原始 KafkaOperations 来执行此操作?我还需要弄清楚如何将此 ErrorHandingDeserialiser 挂接到 ConsumerFactory。

为什么要将 Hoxton 与 Boot 2.5 一起使用? Boot 2.5.2 的正确云版本是 2020.0.3.

SeekToCurrentErrorHandler 已经认为 DeserializationException 是致命的。参见

    /**
     * Add exception types to the default list. By default, the following exceptions will
     * not be retried:
     * <ul>
     * <li>{@link DeserializationException}</li>
     * <li>{@link MessageConversionException}</li>
     * <li>{@link ConversionException}</li>
     * <li>{@link MethodArgumentResolutionException}</li>
     * <li>{@link NoSuchMethodException}</li>
     * <li>{@link ClassCastException}</li>
     * </ul>
     * All others will be retried.
     * @param exceptionTypes the exception types.
     * @since 2.6
     * @see #removeNotRetryableException(Class)
     * @see #setClassifications(Map, boolean)
     */
    @SafeVarargs
    @SuppressWarnings("varargs")
    public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {

ErrorHandlingDeserializer(没有函数)将异常添加到header; DeadLetterPublishingRecoverer 自动从 header 中提取原始负载并设置为传出记录 (byte[]) 的 value()

由于您使用的是本机编码,因此您将需要两个 KafkaTemplates - 一个用于需要 re-serialized 的失败记录,另一个用于 DeserializationExceptions(使用ByteArraySerializer.

    /**
     * Create an instance with the provided templates and destination resolving function,
     * that receives the failed consumer record and the exception and returns a
     * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
     * 0, no partition is set when publishing to the topic. The templates map keys are
     * classes and the value the corresponding template to use for objects (producer
     * record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
     * there is more than one template, to ensure the map is traversed in order. To send
     * records with a null value, add a template with the {@link Void} class as a key;
     * otherwise the first template from the map values iterator will be used.
     * @param templates the {@link KafkaOperations}s to use for publishing.
     * @param destinationResolver the resolving function.
     */
    @SuppressWarnings("unchecked")
    public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
            BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {

I also need to work out how to hook this ErrorHandingDeserialiser into the ConsumerFactory.

只需设置适当的属性 - 请参阅 the documentation