强制 Spring Kafka 不自动创建主题,而是使用已经创建的主题

Force Spring Kafka not to create topics automatically, but to use already created ones

我想实现一个非常简单的案例: 我有一个基础和 DLT 主题:

MessageBus:
  Topic: my_topic
  DltTopic: my_dlt_topic
  Broker: event-serv:9092

所以,这些主题已经预定义,我不需要自动创建它们。

我唯一需要自动处理损坏的消息而不重试,因为它们没有任何意义,所以我有这样的东西:

@KafkaListener(topics = ["#{config.messageBus.topic}"], groupId = "group_id")
@RetryableTopic(
    dltStrategy = DltStrategy.FAIL_ON_ERROR,
    autoCreateTopics = "false",
    attempts = "1"
)
@Throws(IOException::class)
fun consume(rawMessage: String?) {
    ...
}

@DltHandler
fun processMessage(rawMessage: String?) {
    kafkaTemplate.send(config.messageBus.dltTopic, rawMessage)
}

那当然不能正常工作。 我还尝试指定一个 kafkaTemplate

@Bean
fun kafkaTemplate(
    config: Config,
    producerFactory: ProducerFactory<String, String>
): KafkaTemplate<String, String> {
    val template = KafkaTemplate(producerFactory)
    template.defaultTopic = config.messageBus.dltTopic
    return template
}

然而,这并没有改变这种情况。

最后,我相信有一个明显的解决方案,所以请给我一个提示。

这是 Kafka 服务器配置,因此您必须在服务器上进行设置。相关的 属性 是:

  auto.create.topics.enable (true by default)

the documenation

@SpringBootApplication
public class So69317126Application {

    public static void main(String[] args) {
        SpringApplication.run(So69317126Application.class, args);
    }

    @RetryableTopic(attempts = "1", autoCreateTopics = "false", dltStrategy = DltStrategy.FAIL_ON_ERROR)
    @KafkaListener(id = "so69317126", topics = "so69317126")
    void listen(String in) {
        System.out.println(in);
        throw new RuntimeException();
    }

    @DltHandler
    void handler(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    RetryTopicNamesProviderFactory namer() {
        return new RetryTopicNamesProviderFactory() {

            @Override
            public RetryTopicNamesProvider createRetryTopicNamesProvider(Properties properties) {
                if (properties.isMainEndpoint()) {
                    return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties) {

                        @Override
                        public String getTopicName(String topic) {
                            return "so69317126";
                        }

                    };
                }
                else if(properties.isDltTopic()) {
                    return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties) {

                        @Override
                        public String getTopicName(String topic) {
                            return "so69317126.DLT";
                        }

                    };
                }
                else {
                    throw new IllegalStateException("Shouldn't get here - attempts is only 1");
                }
            }

        };

    }

}
so69317126: partitions assigned: [so69317126-0]
so69317126-dlt: partitions assigned: [so69317126.DLT-0]
foo
DLT: foo