Kafka 流不会在反序列化错误时重试
Kafka stream does not retry on deserialisation error
Spring 即使在特定配置之后,云 Kafka 流也不会在反序列化错误时重试。预期是,它应该根据配置的重试策略进行重试,并在最后将失败的消息推送到 DLQ。
配置如下。
spring.cloud.stream.bindings.input_topic.consumer.maxAttempts=7
spring.cloud.stream.bindings.input_topic.consumer.backOffInitialInterval=500
spring.cloud.stream.bindings.input_topic.consumer.backOffMultiplier=10.0
spring.cloud.stream.bindings.input_topic.consumer.backOffMaxInterval=100000
spring.cloud.stream.bindings.iinput_topic.consumer.defaultRetryable=true
public interface MyStreams {
String INPUT_TOPIC = "input_topic";
String INPUT_TOPIC2 = "input_topic2";
String ERROR = "apperror";
String OUTPUT = "output";
@Input(INPUT_TOPIC)
KStream<String, InObject> inboundTopic();
@Input(INPUT_TOPIC2)
KStream<Object, InObject> inboundTOPIC2();
@Output(OUTPUT)
KStream<Object, outObject> outbound();
@Output(ERROR)
MessageChannel outboundError();
}
@StreamListener(MyStreams.INPUT_TOPIC)
@SendTo(MyStreams.OUTPUT)
public KStream<Key, outObject> processSwft(KStream<Key, InObject> myStream) {
return myStream.mapValues(this::transform);
}
KafkaTopicProvisioner.java 中的 metadataRetryOperations 始终为 null,因此它在 afterPropertiesSet()
.
中创建了一个新的 RetryTemplate
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
this.adminClientProperties = kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaBinderConfigurationProperties;
this.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
}
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
this.metadataRetryOperations = metadataRetryOperations;
}
public void afterPropertiesSet() throws Exception {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(10);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100L);
backOffPolicy.setMultiplier(2.0D);
backOffPolicy.setMaxInterval(1000L);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.metadataRetryOperations = retryTemplate;
}
}
Spring cloud Kafka stream does not retry upon deserialization error even after specific configuration.
您看到的行为与 Kafka Streams 在遇到反序列化错误时的默认设置相匹配。
LogAndFailExceptionHandler
implements DeserializationExceptionHandler
and is the default setting in Kafka Streams. It handles any encountered deserialization exceptions by logging the error and throwing a fatal error to stop your Streams application. If your application is configured to use LogAndFailExceptionHandler
, then an instance of your application will fail-fast when it encounters a corrupted record by terminating itself.
我不熟悉 Spring 的 Kafka Streams 外观,但您可能需要配置所需的 org.apache.kafka.streams.errors.DeserializationExceptionHandler
,而不是配置重试(它们用于不同的目的)。或者,您可能想要实现自己的自定义处理程序(有关详细信息,请参阅上面的 link),然后配置 Spring/KStreams 以使用它。
重试配置仅适用于基于 MessageChannel
的绑定器。使用 KStream 绑定器,Spring 只是帮助以规定的方式构建拓扑,一旦构建了拓扑,它就不会参与消息流。
下一版本spring-kafka
(活页夹使用)添加了RecoveringDeserializationExceptionHandler
(commit here);虽然它不能帮助重试,但它可以与 DeadLetterPublishingRecoverer
一起使用,将记录发送到 dead-letter 主题。
您可以在 processors/transformers 中使用 RetryTemplate
来重试特定操作。
Spring 即使在特定配置之后,云 Kafka 流也不会在反序列化错误时重试。预期是,它应该根据配置的重试策略进行重试,并在最后将失败的消息推送到 DLQ。
配置如下。
spring.cloud.stream.bindings.input_topic.consumer.maxAttempts=7
spring.cloud.stream.bindings.input_topic.consumer.backOffInitialInterval=500
spring.cloud.stream.bindings.input_topic.consumer.backOffMultiplier=10.0
spring.cloud.stream.bindings.input_topic.consumer.backOffMaxInterval=100000
spring.cloud.stream.bindings.iinput_topic.consumer.defaultRetryable=true
public interface MyStreams {
String INPUT_TOPIC = "input_topic";
String INPUT_TOPIC2 = "input_topic2";
String ERROR = "apperror";
String OUTPUT = "output";
@Input(INPUT_TOPIC)
KStream<String, InObject> inboundTopic();
@Input(INPUT_TOPIC2)
KStream<Object, InObject> inboundTOPIC2();
@Output(OUTPUT)
KStream<Object, outObject> outbound();
@Output(ERROR)
MessageChannel outboundError();
}
@StreamListener(MyStreams.INPUT_TOPIC)
@SendTo(MyStreams.OUTPUT)
public KStream<Key, outObject> processSwft(KStream<Key, InObject> myStream) {
return myStream.mapValues(this::transform);
}
KafkaTopicProvisioner.java 中的 metadataRetryOperations 始终为 null,因此它在 afterPropertiesSet()
.
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
this.adminClientProperties = kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaBinderConfigurationProperties;
this.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
}
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
this.metadataRetryOperations = metadataRetryOperations;
}
public void afterPropertiesSet() throws Exception {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(10);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100L);
backOffPolicy.setMultiplier(2.0D);
backOffPolicy.setMaxInterval(1000L);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.metadataRetryOperations = retryTemplate;
}
}
Spring cloud Kafka stream does not retry upon deserialization error even after specific configuration.
您看到的行为与 Kafka Streams 在遇到反序列化错误时的默认设置相匹配。
LogAndFailExceptionHandler
implementsDeserializationExceptionHandler
and is the default setting in Kafka Streams. It handles any encountered deserialization exceptions by logging the error and throwing a fatal error to stop your Streams application. If your application is configured to useLogAndFailExceptionHandler
, then an instance of your application will fail-fast when it encounters a corrupted record by terminating itself.
我不熟悉 Spring 的 Kafka Streams 外观,但您可能需要配置所需的 org.apache.kafka.streams.errors.DeserializationExceptionHandler
,而不是配置重试(它们用于不同的目的)。或者,您可能想要实现自己的自定义处理程序(有关详细信息,请参阅上面的 link),然后配置 Spring/KStreams 以使用它。
重试配置仅适用于基于 MessageChannel
的绑定器。使用 KStream 绑定器,Spring 只是帮助以规定的方式构建拓扑,一旦构建了拓扑,它就不会参与消息流。
下一版本spring-kafka
(活页夹使用)添加了RecoveringDeserializationExceptionHandler
(commit here);虽然它不能帮助重试,但它可以与 DeadLetterPublishingRecoverer
一起使用,将记录发送到 dead-letter 主题。
您可以在 processors/transformers 中使用 RetryTemplate
来重试特定操作。