Handle deserialisation errors and other exceptions separately
使用来自 spring-cloud Hoxton.SR12 版本的 spring-cloud-stream 和 Kafka Binder。
通过重试然后推送到 parkingLot 主题来处理任何其他异常。
不重试 ValidationException
public class ErrorHandlingConfig {
private String parkingLotDestination;
private long retryAttempts;
private long retryIntervalMillis;
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer parkingLotPublisher) {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(parkingLotPublisher, new FixedBackOff(retryIntervalMillis, retryAttempts));
return seekToCurrentErrorHandler;
public DeadLetterPublishingRecoverer parkingLotPublisher(KafkaOperations bytesTemplate) {
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(bytesTemplate, (cr, e) -> new TopicPartition(parkingLotDestination, cr.partition()));
deadLetterPublishingRecoverer.setHeadersFunction((cr, e) -> cr.headers());
return deadLetterPublishingRecoverer;
由于无法发送到自定义 dlqName 的突出问题,我想在 binder/binding 配置之外和容器级别执行此操作。
我可以使用 ErrorHandlingDeserializer 并对其调用 setFailedDeserializationFunction()
,它包含一个将消息发送到有毒主题的函数。我应该使用 Source 绑定还是原始 KafkaOperations 来执行此操作?我还需要弄清楚如何将此 ErrorHandingDeserialiser 挂接到 ConsumerFactory。
为什么要将 Hoxton 与 Boot 2.5 一起使用? Boot 2.5.2 的正确云版本是 2020.0.3
已经认为 DeserializationException
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionTypes the exception types.
* @since 2.6
* @see #removeNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
(没有函数)将异常添加到header; DeadLetterPublishingRecoverer
自动从 header 中提取原始负载并设置为传出记录 (byte[]
) 的 value()
由于您使用的是本机编码,因此您将需要两个 KafkaTemplate
s - 一个用于需要 re-serialized 的失败记录,另一个用于 DeserializationException
* Create an instance with the provided templates and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic. The templates map keys are
* classes and the value the corresponding template to use for objects (producer
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
* there is more than one template, to ensure the map is traversed in order. To send
* records with a null value, add a template with the {@link Void} class as a key;
* otherwise the first template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
I also need to work out how to hook this ErrorHandingDeserialiser into the ConsumerFactory.
只需设置适当的属性 - 请参阅 the documentation。
使用来自 spring-cloud Hoxton.SR12 版本的 spring-cloud-stream 和 Kafka Binder。 引导版本:2.5.2
通过重试然后推送到 parkingLot 主题来处理任何其他异常。
不重试 ValidationException
public class ErrorHandlingConfig {
private String parkingLotDestination;
private long retryAttempts;
private long retryIntervalMillis;
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer parkingLotPublisher) {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(parkingLotPublisher, new FixedBackOff(retryIntervalMillis, retryAttempts));
return seekToCurrentErrorHandler;
public DeadLetterPublishingRecoverer parkingLotPublisher(KafkaOperations bytesTemplate) {
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(bytesTemplate, (cr, e) -> new TopicPartition(parkingLotDestination, cr.partition()));
deadLetterPublishingRecoverer.setHeadersFunction((cr, e) -> cr.headers());
return deadLetterPublishingRecoverer;
由于无法发送到自定义 dlqName 的突出问题,我想在 binder/binding 配置之外和容器级别执行此操作。
我可以使用 ErrorHandlingDeserializer 并对其调用 setFailedDeserializationFunction()
,它包含一个将消息发送到有毒主题的函数。我应该使用 Source 绑定还是原始 KafkaOperations 来执行此操作?我还需要弄清楚如何将此 ErrorHandingDeserialiser 挂接到 ConsumerFactory。
为什么要将 Hoxton 与 Boot 2.5 一起使用? Boot 2.5.2 的正确云版本是 2020.0.3
已经认为 DeserializationException
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionTypes the exception types.
* @since 2.6
* @see #removeNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
(没有函数)将异常添加到header; DeadLetterPublishingRecoverer
自动从 header 中提取原始负载并设置为传出记录 (byte[]
) 的 value()
由于您使用的是本机编码,因此您将需要两个 KafkaTemplate
s - 一个用于需要 re-serialized 的失败记录,另一个用于 DeserializationException
* Create an instance with the provided templates and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic. The templates map keys are
* classes and the value the corresponding template to use for objects (producer
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
* there is more than one template, to ensure the map is traversed in order. To send
* records with a null value, add a template with the {@link Void} class as a key;
* otherwise the first template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
I also need to work out how to hook this ErrorHandingDeserialiser into the ConsumerFactory.
只需设置适当的属性 - 请参阅 the documentation。