目标解析器返回不存在的分区
Destination resolver returned non-existent partition
我正在使用 Spring-Kafka 来使用来自 Confluent Kafka 的消息,并且我正在使用 RetryTopicConfiguration Bean 来配置主题和退避策略。我的应用程序工作正常,但我在日志中看到很多警告日志,如下所示,我想知道我的配置是否不正确。
DeadLetterPublishingRecovererFactory : Destination resolver returned non-existent partition flow-events-retry-0-4, KafkaProducer will determine partition to use for this topic
配置代码
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(BACKOFF_INITIAL_DELAY_10MINS, BACKOFF_EXPONENTIAL_MULTIPLIER_3, BACKOFF_MAX_DELAY_4HRS)
.maxAttempts(5)
.doNotAutoCreateRetryTopics()
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.create(template);
}
重试主题单独创建,分区为 1,复制因子为 3。
默认使用与原主题相同的分区;您可以通过覆盖 DeadLetterPublishingRecovererFactory
@Bean
:
来覆盖该行为
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver) {
@Override
protected TopicPartition resolveTopicPartition(ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
return new TopicPartition(nextDestination.getDestinationName(), -1); // Kafka Chooses
// return new TopicPartition(nextDestination.getDestinationName(), 0); // explict
}
};
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
// ...
});
return factory;
}
如您在此示例中所见,您也可以在此处自定义 DLPR 属性。
/**
* Creates and returns the {@link TopicPartition}, where the original record should be forwarded.
* By default, it will use the partition same as original record's partition, in the next destination topic.
*
* <p>{@link DeadLetterPublishingRecoverer#checkPartition} has logic to check whether that partition exists,
* and if it doesn't it sets -1, to allow the Producer itself to assign a partition to the record.</p>
*
* <p>Subclasses can inherit from this method to override the implementation, if necessary.</p>
*
* @param cr The original {@link ConsumerRecord}, which is to be forwarded to DLT
* @param nextDestination The next {@link DestinationTopic}, where the consumerRecord is to be forwarded
* @return An instance of {@link TopicPartition}, specifying the topic and partition, where the cr is to be sent
*/
protected TopicPartition resolveTopicPartition(final ConsumerRecord<?, ?> cr, final DestinationTopic nextDestination) {
return new TopicPartition(nextDestination.getDestinationName(), cr.partition());
}
我正在使用 Spring-Kafka 来使用来自 Confluent Kafka 的消息,并且我正在使用 RetryTopicConfiguration Bean 来配置主题和退避策略。我的应用程序工作正常,但我在日志中看到很多警告日志,如下所示,我想知道我的配置是否不正确。
DeadLetterPublishingRecovererFactory : Destination resolver returned non-existent partition flow-events-retry-0-4, KafkaProducer will determine partition to use for this topic
配置代码
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(BACKOFF_INITIAL_DELAY_10MINS, BACKOFF_EXPONENTIAL_MULTIPLIER_3, BACKOFF_MAX_DELAY_4HRS)
.maxAttempts(5)
.doNotAutoCreateRetryTopics()
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.create(template);
}
重试主题单独创建,分区为 1,复制因子为 3。
默认使用与原主题相同的分区;您可以通过覆盖 DeadLetterPublishingRecovererFactory
@Bean
:
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver) {
@Override
protected TopicPartition resolveTopicPartition(ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
return new TopicPartition(nextDestination.getDestinationName(), -1); // Kafka Chooses
// return new TopicPartition(nextDestination.getDestinationName(), 0); // explict
}
};
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
// ...
});
return factory;
}
如您在此示例中所见,您也可以在此处自定义 DLPR 属性。
/**
* Creates and returns the {@link TopicPartition}, where the original record should be forwarded.
* By default, it will use the partition same as original record's partition, in the next destination topic.
*
* <p>{@link DeadLetterPublishingRecoverer#checkPartition} has logic to check whether that partition exists,
* and if it doesn't it sets -1, to allow the Producer itself to assign a partition to the record.</p>
*
* <p>Subclasses can inherit from this method to override the implementation, if necessary.</p>
*
* @param cr The original {@link ConsumerRecord}, which is to be forwarded to DLT
* @param nextDestination The next {@link DestinationTopic}, where the consumerRecord is to be forwarded
* @return An instance of {@link TopicPartition}, specifying the topic and partition, where the cr is to be sent
*/
protected TopicPartition resolveTopicPartition(final ConsumerRecord<?, ?> cr, final DestinationTopic nextDestination) {
return new TopicPartition(nextDestination.getDestinationName(), cr.partition());
}