Spring 云流使用 Kafka DLT 处理毒丸
Spring cloud stream handling poison pills with Kafka DLT
- spring-boot 2.5.2
- spring-cloud Hoxton.SR12
- spring-kafka 2.6.7(因问题降级:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1079)
我按照这个方法来处理反序列化错误:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/recipes/recipe-3-handling-deserialization-errors-dlq-kafka.adoc
我创建了上面食谱中提到的豆子:
Configuration
@Slf4j
public class ErrorHandlingConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
}
配置文件:
spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeDecoding: true
bindings:
myInboundRoute:
destination: some-destination.1
group: a-custom-group
myOutboundRoute:
destination: some-destination.2
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
configuration:
application:
security: PLAINTEXT
bindings:
myInboundRoute:
consumer:
autoCommitOffset: true
startOffset: latest
enableDlq: true
dlqName: my-dql.poison
dlqProducerProperties:
configuration:
value.serializer: myapp.serde.MyCustomSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: myapp.serde.MyCustomSerializer
myOutboundRoute:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: myapp.serde.MyCustomSerializer
我期待 DLT 被称为 my-dql.poison
。该主题实际上创建得很好,但是我还自动创建了第二个主题 some-destination.1.DLT
为什么它会创建这个以及我在配置中用 dlqName
命名的那个?
我做错了什么?当我轮询消息时,消息在自动创建的 some-destination.1.DLT
中,而不是我的 dlqName
如果您在容器中配置了 STCEH,则不应在绑定中配置 dlt 处理。同时设置 maxAttempts=1
以禁用那里的重试。
您需要在 DLPR 中配置目标解析器以使用不同的名称。
/**
* Create an instance with the provided template and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic.
* @param template the {@link KafkaOperations} to use for publishing.
* @param destinationResolver the resolving function.
*/
public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
this(Collections.singletonMap(Object.class, template), destinationResolver);
}
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
使用绑定的 DLT 名称配置 DLPR 存在未决问题。
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1031
- spring-boot 2.5.2
- spring-cloud Hoxton.SR12
- spring-kafka 2.6.7(因问题降级:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1079)
我按照这个方法来处理反序列化错误:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/recipes/recipe-3-handling-deserialization-errors-dlq-kafka.adoc
我创建了上面食谱中提到的豆子:
Configuration
@Slf4j
public class ErrorHandlingConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
}
配置文件:
spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeDecoding: true
bindings:
myInboundRoute:
destination: some-destination.1
group: a-custom-group
myOutboundRoute:
destination: some-destination.2
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
configuration:
application:
security: PLAINTEXT
bindings:
myInboundRoute:
consumer:
autoCommitOffset: true
startOffset: latest
enableDlq: true
dlqName: my-dql.poison
dlqProducerProperties:
configuration:
value.serializer: myapp.serde.MyCustomSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: myapp.serde.MyCustomSerializer
myOutboundRoute:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: myapp.serde.MyCustomSerializer
我期待 DLT 被称为 my-dql.poison
。该主题实际上创建得很好,但是我还自动创建了第二个主题 some-destination.1.DLT
为什么它会创建这个以及我在配置中用 dlqName
命名的那个?
我做错了什么?当我轮询消息时,消息在自动创建的 some-destination.1.DLT
中,而不是我的 dlqName
如果您在容器中配置了 STCEH,则不应在绑定中配置 dlt 处理。同时设置
maxAttempts=1
以禁用那里的重试。您需要在 DLPR 中配置目标解析器以使用不同的名称。
/**
* Create an instance with the provided template and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic.
* @param template the {@link KafkaOperations} to use for publishing.
* @param destinationResolver the resolving function.
*/
public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
this(Collections.singletonMap(Object.class, template), destinationResolver);
}
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
使用绑定的 DLT 名称配置 DLPR 存在未决问题。
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1031