我们如何将 DeadLetterPublishingRecoverer 与 RetryTemplate 一起使用?

How can we use DeadLetterPublishingRecoverer with RetryTemplate?

我想将 RetryTemplate 与 DeadLetterPublishingRecoverer 一起使用。

我如何使用它才能从 RetryTemplate 读取重试计数和 retryInterval,并在重试后移动到 dlq。

@Bean
public RetryTemplate retryTemplate(){
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(retryInterval);
    retryTemplate.setRetryPolicy(retryPolicy());
    retryTemplate.setBackOffPolicy(backOffPolicy());
    return retryTemplate;
}


@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, String> chainedTM) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    factory.getContainerProperties().setSyncCommits(true);
    factory.setRetryTemplate(retryTemplate());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setTransactionManager(chainedTM);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 1));
    return factory;
}

您应该在重试逻辑中而不是在错误处理程序中进行恢复(发布)。参见

        factory.setRecoveryCallback(context -> {
            recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
                    (Exception) context.getLastThrowable());
            return null;
        });

其中 recovererDeadLetterPublishingRecoverer

编辑

/**
 * Create an instance with the provided template and destination resolving function,
 * that receives the failed consumer record and the exception and returns a
 * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
 * 0, no partition is set when publishing to the topic.
 * @param template the {@link KafkaTemplate} to use for publishing.
 * @param destinationResolver the resolving function.
 */
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template,
        BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
    this(Collections.singletonMap(Object.class, template), destinationResolver);
}

如果 DLT 没有与原始主题一样多的分区,您需要一个自定义目标解析器:

(record, exception) -> new TopicPartition("my.DLT", -1)

负分区,Kafka会选择分区;默认解析器使用相同的分区。

DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());

这在[文档[(https://docs.spring.io/spring-kafka/docs/2.2.7.RELEASE/reference/html/#dead-letters)

中有解释

You can also, optionally, configure it with a BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>, which is called to resolve the destination topic and partition. By default, the dead-letter record is sent to a topic named <originalTopic>.DLT (the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic. If the returned TopicPartition has a negative partition, the partition is not set in the ProducerRecord, so the partition is selected by Kafka.