Spring-Kafka RetryableTopic 导致 RecordTooLargeException

Spring-Kafka RetryableTopic causes RecordTooLargeException

我正在使用 spring-kafka RetryableTopic 进行 non-blocking 次重试,具有固定的 BackOff 和单个重试主题 (https://docs.spring.io/spring-kafka/reference/html/#single-topic-fixed-delay-retries)

我注意到当 retry-attempt 相对较高时我得到 RecordTooLargeException,并且在检查消息时我看到它包含之前所有尝试的 Kafka headers,以及一些 headers 和 kafka_exception-stacktrace 一样,它们很重。

为什么它尝试发布重试消息与之前重试的headers? 我找不到任何配置。 这些 headers 能否以某种方式被操纵以在发布前将其剪掉?

好点;我打开了一个问题。

https://github.com/spring-projects/spring-kafka/issues/1994

在实施之前,我会看看是否可以想出一个解决方法。

编辑

public class HeaderStrippingInterceptor<K, V> implements ProducerInterceptor<K, V> {

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        Header header = record.headers().lastHeader(KafkaHeaders.EXCEPTION_STACKTRACE);
        if (header != null) {
            record.headers().remove(KafkaHeaders.EXCEPTION_STACKTRACE);
            record.headers().add(header);
        }
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}

然后将其 class 名称添加到生产者配置中;例如,使用 Spring Boot:

spring:
  kafka:
    producer:
      properties:
        "[interceptor.classes]": com.example.demo.HeaderStrippingInterceptor