Spring Kafka Consumer - 从复杂对象中获取主题
Spring Kafka Consumer - Getting topics from a complex object
我正在开发 spring 引导 kafka 消费者应用程序。它将有不同的消费者处理不同的主题。消费者的所有信息都来自 application.yml 文件。
application:
kafka:
consumer-config:
- name: consumer-a
topics: topic1,topic2
......
- name: consumer-b
topics: topic3,topic4
.....
我无法将主题列表从应用程序属性设置到 KafkaListener。
我尝试了以下方法:
@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(topics = "#{'${application.kafka.consumer-config.?[name == 'consumer-a'].topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
在这两种情况下,我都收到以下错误:
java.lang.IllegalArgumentException: 无法解析占位符
从应用程序属性获取主题并将其设置在 KafkaListener 主题上的最佳方法是什么?
您使用的是什么版本?我刚刚测试了它,它工作正常...
@SpringBootApplication
public class So63583349Application {
public static void main(String[] args) {
SpringApplication.run(So63583349Application.class, args);
}
@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}", id = "so63583349")
public void listen(String in) {
System.out.println(in);
}
}
2020-08-25 13:02:28.384 WARN 66237 --- [o63583349-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-so63583349-1, groupId=so63583349] Error while fetching metadata with correlation id 41 : {topic1=UNKNOWN_TOPIC_OR_PARTITION, topic2=UNKNOWN_TOPIC_OR_PARTITION}
对于第二个,您不能在 属性 占位符内使用 SpEL 选择。这是针对这种情况的一种解决方案:
@SpringBootApplication
public class So63583349Application {
public static void main(String[] args) {
SpringApplication.run(So63583349Application.class, args);
}
@KafkaListener(topics = "#{@props.consumerConfig.?[name == 'consumer-a'].get(0).topics.split(',')}",
id = "so63583349")
public void listen(String in) {
System.out.println(in);
}
@Bean
Props props() {
return new Props();
}
}
@ConfigurationProperties(value = "application.kafka")
class Props {
List<Properties> consumerConfig;
public List<Properties> getConsumerConfig() {
return this.consumerConfig;
}
public void setConsumerConfig(List<Properties> consumerConfig) {
this.consumerConfig = consumerConfig;
}
}
我正在开发 spring 引导 kafka 消费者应用程序。它将有不同的消费者处理不同的主题。消费者的所有信息都来自 application.yml 文件。
application:
kafka:
consumer-config:
- name: consumer-a
topics: topic1,topic2
......
- name: consumer-b
topics: topic3,topic4
.....
我无法将主题列表从应用程序属性设置到 KafkaListener。
我尝试了以下方法:
@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(topics = "#{'${application.kafka.consumer-config.?[name == 'consumer-a'].topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
在这两种情况下,我都收到以下错误:
java.lang.IllegalArgumentException: 无法解析占位符
从应用程序属性获取主题并将其设置在 KafkaListener 主题上的最佳方法是什么?
您使用的是什么版本?我刚刚测试了它,它工作正常...
@SpringBootApplication
public class So63583349Application {
public static void main(String[] args) {
SpringApplication.run(So63583349Application.class, args);
}
@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}", id = "so63583349")
public void listen(String in) {
System.out.println(in);
}
}
2020-08-25 13:02:28.384 WARN 66237 --- [o63583349-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-so63583349-1, groupId=so63583349] Error while fetching metadata with correlation id 41 : {topic1=UNKNOWN_TOPIC_OR_PARTITION, topic2=UNKNOWN_TOPIC_OR_PARTITION}
对于第二个,您不能在 属性 占位符内使用 SpEL 选择。这是针对这种情况的一种解决方案:
@SpringBootApplication
public class So63583349Application {
public static void main(String[] args) {
SpringApplication.run(So63583349Application.class, args);
}
@KafkaListener(topics = "#{@props.consumerConfig.?[name == 'consumer-a'].get(0).topics.split(',')}",
id = "so63583349")
public void listen(String in) {
System.out.println(in);
}
@Bean
Props props() {
return new Props();
}
}
@ConfigurationProperties(value = "application.kafka")
class Props {
List<Properties> consumerConfig;
public List<Properties> getConsumerConfig() {
return this.consumerConfig;
}
public void setConsumerConfig(List<Properties> consumerConfig) {
this.consumerConfig = consumerConfig;
}
}