Spring Kafka Consumer - 从复杂对象中获取主题

Spring Kafka Consumer - Getting topics from a complex object

我正在开发 spring 引导 kafka 消费者应用程序。它将有不同的消费者处理不同的主题。消费者的所有信息都来自 application.yml 文件。

application:
  kafka:
    consumer-config:
      - name: consumer-a
        topics: topic1,topic2
        ......
      - name: consumer-b
        topics: topic3,topic4
        .....

我无法将主题列表从应用程序属性设置到 KafkaListener。

我尝试了以下方法:

@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactory")


@KafkaListener(topics = "#{'${application.kafka.consumer-config.?[name == 'consumer-a'].topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")

在这两种情况下,我都收到以下错误:

java.lang.IllegalArgumentException: 无法解析占位符

从应用程序属性获取主题并将其设置在 KafkaListener 主题上的最佳方法是什么?

您使用的是什么版本?我刚刚测试了它,它工作正常...

@SpringBootApplication
public class So63583349Application {

    public static void main(String[] args) {
        SpringApplication.run(So63583349Application.class, args);
    }

    @KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}", id = "so63583349")
    public void listen(String in) {
        System.out.println(in);
    }

}

2020-08-25 13:02:28.384 WARN 66237 --- [o63583349-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-so63583349-1, groupId=so63583349] Error while fetching metadata with correlation id 41 : {topic1=UNKNOWN_TOPIC_OR_PARTITION, topic2=UNKNOWN_TOPIC_OR_PARTITION}

对于第二个,您不能在 属性 占位符内使用 SpEL 选择。这是针对这种情况的一种解决方案:

@SpringBootApplication
public class So63583349Application {

    public static void main(String[] args) {
        SpringApplication.run(So63583349Application.class, args);
    }

    @KafkaListener(topics = "#{@props.consumerConfig.?[name == 'consumer-a'].get(0).topics.split(',')}",
            id = "so63583349")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    Props props() {
        return new Props();
    }

}

@ConfigurationProperties(value = "application.kafka")
class Props {

    List<Properties> consumerConfig;

    public List<Properties> getConsumerConfig() {
        return this.consumerConfig;
    }

    public void setConsumerConfig(List<Properties> consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

}