强制 Spring Kafka 不自动创建主题,而是使用已经创建的主题
Force Spring Kafka not to create topics automatically, but to use already created ones
我想实现一个非常简单的案例:
我有一个基础和 DLT 主题:
MessageBus:
Topic: my_topic
DltTopic: my_dlt_topic
Broker: event-serv:9092
所以,这些主题已经预定义,我不需要自动创建它们。
我唯一需要自动处理损坏的消息而不重试,因为它们没有任何意义,所以我有这样的东西:
@KafkaListener(topics = ["#{config.messageBus.topic}"], groupId = "group_id")
@RetryableTopic(
dltStrategy = DltStrategy.FAIL_ON_ERROR,
autoCreateTopics = "false",
attempts = "1"
)
@Throws(IOException::class)
fun consume(rawMessage: String?) {
...
}
@DltHandler
fun processMessage(rawMessage: String?) {
kafkaTemplate.send(config.messageBus.dltTopic, rawMessage)
}
那当然不能正常工作。
我还尝试指定一个 kafkaTemplate
@Bean
fun kafkaTemplate(
config: Config,
producerFactory: ProducerFactory<String, String>
): KafkaTemplate<String, String> {
val template = KafkaTemplate(producerFactory)
template.defaultTopic = config.messageBus.dltTopic
return template
}
然而,这并没有改变这种情况。
最后,我相信有一个明显的解决方案,所以请给我一个提示。
这是 Kafka 服务器配置,因此您必须在服务器上进行设置。相关的 属性 是:
auto.create.topics.enable (true by default)
@SpringBootApplication
public class So69317126Application {
public static void main(String[] args) {
SpringApplication.run(So69317126Application.class, args);
}
@RetryableTopic(attempts = "1", autoCreateTopics = "false", dltStrategy = DltStrategy.FAIL_ON_ERROR)
@KafkaListener(id = "so69317126", topics = "so69317126")
void listen(String in) {
System.out.println(in);
throw new RuntimeException();
}
@DltHandler
void handler(String in) {
System.out.println("DLT: " + in);
}
@Bean
RetryTopicNamesProviderFactory namer() {
return new RetryTopicNamesProviderFactory() {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "so69317126";
}
};
}
else if(properties.isDltTopic()) {
return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "so69317126.DLT";
}
};
}
else {
throw new IllegalStateException("Shouldn't get here - attempts is only 1");
}
}
};
}
}
so69317126: partitions assigned: [so69317126-0]
so69317126-dlt: partitions assigned: [so69317126.DLT-0]
foo
DLT: foo
我想实现一个非常简单的案例: 我有一个基础和 DLT 主题:
MessageBus:
Topic: my_topic
DltTopic: my_dlt_topic
Broker: event-serv:9092
所以,这些主题已经预定义,我不需要自动创建它们。
我唯一需要自动处理损坏的消息而不重试,因为它们没有任何意义,所以我有这样的东西:
@KafkaListener(topics = ["#{config.messageBus.topic}"], groupId = "group_id")
@RetryableTopic(
dltStrategy = DltStrategy.FAIL_ON_ERROR,
autoCreateTopics = "false",
attempts = "1"
)
@Throws(IOException::class)
fun consume(rawMessage: String?) {
...
}
@DltHandler
fun processMessage(rawMessage: String?) {
kafkaTemplate.send(config.messageBus.dltTopic, rawMessage)
}
那当然不能正常工作。
我还尝试指定一个 kafkaTemplate
@Bean
fun kafkaTemplate(
config: Config,
producerFactory: ProducerFactory<String, String>
): KafkaTemplate<String, String> {
val template = KafkaTemplate(producerFactory)
template.defaultTopic = config.messageBus.dltTopic
return template
}
然而,这并没有改变这种情况。
最后,我相信有一个明显的解决方案,所以请给我一个提示。
这是 Kafka 服务器配置,因此您必须在服务器上进行设置。相关的 属性 是:
auto.create.topics.enable (true by default)
@SpringBootApplication
public class So69317126Application {
public static void main(String[] args) {
SpringApplication.run(So69317126Application.class, args);
}
@RetryableTopic(attempts = "1", autoCreateTopics = "false", dltStrategy = DltStrategy.FAIL_ON_ERROR)
@KafkaListener(id = "so69317126", topics = "so69317126")
void listen(String in) {
System.out.println(in);
throw new RuntimeException();
}
@DltHandler
void handler(String in) {
System.out.println("DLT: " + in);
}
@Bean
RetryTopicNamesProviderFactory namer() {
return new RetryTopicNamesProviderFactory() {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "so69317126";
}
};
}
else if(properties.isDltTopic()) {
return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "so69317126.DLT";
}
};
}
else {
throw new IllegalStateException("Shouldn't get here - attempts is only 1");
}
}
};
}
}
so69317126: partitions assigned: [so69317126-0]
so69317126-dlt: partitions assigned: [so69317126.DLT-0]
foo
DLT: foo