Spring-Kafka RetryableTopic 导致 RecordTooLargeException
Spring-Kafka RetryableTopic causes RecordTooLargeException
我正在使用 spring-kafka RetryableTopic 进行 non-blocking 次重试,具有固定的 BackOff 和单个重试主题 (https://docs.spring.io/spring-kafka/reference/html/#single-topic-fixed-delay-retries)
我注意到当 retry-attempt 相对较高时我得到 RecordTooLargeException
,并且在检查消息时我看到它包含之前所有尝试的 Kafka headers,以及一些 headers 和 kafka_exception-stacktrace
一样,它们很重。
为什么它尝试发布重试消息与之前重试的headers?
我找不到任何配置。
这些 headers 能否以某种方式被操纵以在发布前将其剪掉?
好点;我打开了一个问题。
https://github.com/spring-projects/spring-kafka/issues/1994
在实施之前,我会看看是否可以想出一个解决方法。
编辑
public class HeaderStrippingInterceptor<K, V> implements ProducerInterceptor<K, V> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
Header header = record.headers().lastHeader(KafkaHeaders.EXCEPTION_STACKTRACE);
if (header != null) {
record.headers().remove(KafkaHeaders.EXCEPTION_STACKTRACE);
record.headers().add(header);
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
然后将其 class 名称添加到生产者配置中;例如,使用 Spring Boot:
spring:
kafka:
producer:
properties:
"[interceptor.classes]": com.example.demo.HeaderStrippingInterceptor
我正在使用 spring-kafka RetryableTopic 进行 non-blocking 次重试,具有固定的 BackOff 和单个重试主题 (https://docs.spring.io/spring-kafka/reference/html/#single-topic-fixed-delay-retries)
我注意到当 retry-attempt 相对较高时我得到 RecordTooLargeException
,并且在检查消息时我看到它包含之前所有尝试的 Kafka headers,以及一些 headers 和 kafka_exception-stacktrace
一样,它们很重。
为什么它尝试发布重试消息与之前重试的headers? 我找不到任何配置。 这些 headers 能否以某种方式被操纵以在发布前将其剪掉?
好点;我打开了一个问题。
https://github.com/spring-projects/spring-kafka/issues/1994
在实施之前,我会看看是否可以想出一个解决方法。
编辑
public class HeaderStrippingInterceptor<K, V> implements ProducerInterceptor<K, V> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
Header header = record.headers().lastHeader(KafkaHeaders.EXCEPTION_STACKTRACE);
if (header != null) {
record.headers().remove(KafkaHeaders.EXCEPTION_STACKTRACE);
record.headers().add(header);
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
然后将其 class 名称添加到生产者配置中;例如,使用 Spring Boot:
spring:
kafka:
producer:
properties:
"[interceptor.classes]": com.example.demo.HeaderStrippingInterceptor