如何在一个 Kafka 侦听器的单独配置属性中指定多个主题?
How to specify multiple topics in separate config properties for one Kafka listener?
我想创建一个 spring 从多个 Kafka 主题读取的启动应用程序。我知道我可以在我的 appliation.properties 上创建一个逗号分隔的主题列表,但是我希望单独列出主题名称以提高可读性,这样我就可以使用每个主题名称来计算如何处理消息。
我发现了以下问题,但它们的主题都以逗号分隔的数组形式列出:
Consume multiple topics in one listener in spring boot kafka
我最接近的是:
application.properties
kafka.topic1=topic1
kafka.topic2=topic2
卡夫卡消费者
@KafkaListener(topics = "#{'${kafka.topic1}'},#{'${kafka.topic2}'}")
public void receive(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(required = false, name= KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Payload(required = false) String payload) throws IOException {
}
这给出了错误:
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic1,topic2]
我知道我需要它是 {"topic1", "topic2} 但我不知道怎么做。
注释 @KafkaListener(topics = "#{'${kafka.topic1}'}")
正确订阅了第一个主题。如果我将其更改为 @KafkaListener(topics = "#{'${kafka.topic2}'}")
我可以正确订阅第二个主题。
只是在注释中创建主题数组,我无法理解。
任何帮助都会很棒
@KafkaListener(id = "so71497475", topics = { "${kafka.topic1}", "${kafka.topic2}" })
编辑
这是一种更复杂的技术,可以让您在不更改任何代码的情况下添加更多主题:
@SpringBootApplication
@EnableConfigurationProperties
public class So71497475Application {
public static void main(String[] args) {
SpringApplication.run(So71497475Application.class, args);
}
@KafkaListener(id = "so71497475", topics = "#{@myProps.kafkaTopics}")
void listen(String in) {
System.out.println(in);
}
@Bean // This will add the topics to the broker if not present
KafkaAdmin.NewTopics topics(MyProps props) {
return new KafkaAdmin.NewTopics(props.getTopics().stream()
.map(t -> TopicBuilder.name(t).partitions(1).replicas(1).build())
.toArray(size -> new NewTopic[size]));
}
}
@ConfigurationProperties("my.kafka")
@Component
class MyProps {
private List<String> topics = new ArrayList<>();
public List<String> getTopics() {
return this.topics;
}
public void setTopics(List<String> topics) {
this.topics = topics;
}
public String[] getKafkaTopics() {
return this.topics.toArray(new String[0]);
}
}
my.kafka.topics[0]=topic1
my.kafka.topics[1]=topic2
my.kafka.topics[2]=topic3
so71497475: partitions assigned: [topic1-0, topic2-0, topic3-0]
如果您将主题配置为逗号分隔 luke:
kafka.topics = topic1,topic2
在这种情况下,您可以简单地使用:
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
void listen(){}
我想创建一个 spring 从多个 Kafka 主题读取的启动应用程序。我知道我可以在我的 appliation.properties 上创建一个逗号分隔的主题列表,但是我希望单独列出主题名称以提高可读性,这样我就可以使用每个主题名称来计算如何处理消息。
我发现了以下问题,但它们的主题都以逗号分隔的数组形式列出:
Consume multiple topics in one listener in spring boot kafka
我最接近的是:
application.properties
kafka.topic1=topic1
kafka.topic2=topic2
卡夫卡消费者
@KafkaListener(topics = "#{'${kafka.topic1}'},#{'${kafka.topic2}'}")
public void receive(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(required = false, name= KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Payload(required = false) String payload) throws IOException {
}
这给出了错误:
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic1,topic2]
我知道我需要它是 {"topic1", "topic2} 但我不知道怎么做。
注释 @KafkaListener(topics = "#{'${kafka.topic1}'}")
正确订阅了第一个主题。如果我将其更改为 @KafkaListener(topics = "#{'${kafka.topic2}'}")
我可以正确订阅第二个主题。
只是在注释中创建主题数组,我无法理解。
任何帮助都会很棒
@KafkaListener(id = "so71497475", topics = { "${kafka.topic1}", "${kafka.topic2}" })
编辑
这是一种更复杂的技术,可以让您在不更改任何代码的情况下添加更多主题:
@SpringBootApplication
@EnableConfigurationProperties
public class So71497475Application {
public static void main(String[] args) {
SpringApplication.run(So71497475Application.class, args);
}
@KafkaListener(id = "so71497475", topics = "#{@myProps.kafkaTopics}")
void listen(String in) {
System.out.println(in);
}
@Bean // This will add the topics to the broker if not present
KafkaAdmin.NewTopics topics(MyProps props) {
return new KafkaAdmin.NewTopics(props.getTopics().stream()
.map(t -> TopicBuilder.name(t).partitions(1).replicas(1).build())
.toArray(size -> new NewTopic[size]));
}
}
@ConfigurationProperties("my.kafka")
@Component
class MyProps {
private List<String> topics = new ArrayList<>();
public List<String> getTopics() {
return this.topics;
}
public void setTopics(List<String> topics) {
this.topics = topics;
}
public String[] getKafkaTopics() {
return this.topics.toArray(new String[0]);
}
}
my.kafka.topics[0]=topic1
my.kafka.topics[1]=topic2
my.kafka.topics[2]=topic3
so71497475: partitions assigned: [topic1-0, topic2-0, topic3-0]
如果您将主题配置为逗号分隔 luke:
kafka.topics = topic1,topic2
在这种情况下,您可以简单地使用:
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
void listen(){}