Spring 云流使用 Kafka DLT 处理毒丸

Spring cloud stream handling poison pills with Kafka DLT

我按照这个方法来处理反序列化错误:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/recipes/recipe-3-handling-deserialization-errors-dlq-kafka.adoc

我创建了上面食谱中提到的豆子:

Configuration
@Slf4j
public class ErrorHandlingConfig {

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
        return (container, dest, group) -> {
            container.setErrorHandler(errorHandler);
        };
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
    }

    @Bean
    public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
        return new DeadLetterPublishingRecoverer(bytesTemplate);
    }
}

配置文件:

spring:
  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
        consumer:
          useNativeDecoding: true
      bindings:
        myInboundRoute:
          destination: some-destination.1
          group: a-custom-group
        myOutboundRoute:
          destination: some-destination.2
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          configuration:
            application:
              security: PLAINTEXT
        bindings:
          myInboundRoute:
            consumer:
              autoCommitOffset: true
              startOffset: latest
              enableDlq: true
              dlqName: my-dql.poison
              dlqProducerProperties:
                configuration:
                  value.serializer: myapp.serde.MyCustomSerializer
              configuration:
                  value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                  spring.deserializer.value.delegate.class: myapp.serde.MyCustomSerializer
          myOutboundRoute:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: myapp.serde.MyCustomSerializer

我期待 DLT 被称为 my-dql.poison。该主题实际上创建得很好,但是我还自动创建了第二个主题 some-destination.1.DLT 为什么它会创建这个以及我在配置中用 dlqName 命名的那个?

我做错了什么?当我轮询消息时,消息在自动创建的 some-destination.1.DLT 中,而不是我的 dlqName

  1. 如果您在容器中配置了 STCEH,则不应在绑定中配置 dlt 处理。同时设置 maxAttempts=1 以禁用那里的重试。

  2. 您需要在 DLPR 中配置目标解析器以使用不同的名称。

    /**
     * Create an instance with the provided template and destination resolving function,
     * that receives the failed consumer record and the exception and returns a
     * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
     * 0, no partition is set when publishing to the topic.
     * @param template the {@link KafkaOperations} to use for publishing.
     * @param destinationResolver the resolving function.
     */
    public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template,
            BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
        this(Collections.singletonMap(Object.class, template), destinationResolver);
    }

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

使用绑定的 DLT 名称配置 DLPR 存在未决问题。

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1031