使用 Spring EL 将属性中的可选后缀添加到 @KafkaListener 中的 consumerGroup

Using Spring EL to add optional postfix from properties to consumerGroup in @KafkaListener

我有一个简单的 spring 引导应用程序和 Kafka 消费者,看起来像

@KafkaListener(topics="topic", groupId="SOME_CONSTANT") {
....
}

我需要做的是添加可选的 spring boot 属性 (来自环境变量,但这并不重要)让我们说: myapp.env: TEST

并且当该变量存在时,我应该自动将消费者组更新为 SOME_CONSTANT-TEST

我在玩SPEL

@KafkaListener(topics="topic", groupId="#{ '${myApp.env}' == null ? 'SOME_CONSTANT' : 'SOME_CONSTANT' + '-' + '${myApp.env}}'") {
....
}

但这似乎行不通:/有什么想法吗?

可以使用T运算符读取常量的值,在没有环境变量的情况下使用冒号':':

@KafkaListener(topics="topic", groupId="#{ '${my.app.env:}' == '' ? T(com.mypackage.MyListener).SOME_CONSTANT : T(com.mypackage.MyListener).SOME_CONSTANT + '-' + '${my.app.env:}'}")

这是一个使用此解决方案的示例应用程序:

package org.spring.kafka.playground;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@SpringBootApplication
public class SO71291726 {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(SO71291726.class, args);
        try {
            Thread.sleep(10000);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new RuntimeException("Interrupted");
        }
        KafkaOperations kafkaTemplate = context.getBean("kafkaTemplate", KafkaOperations.class);
        kafkaTemplate.send("topic", "My message");
    }

    Logger log = LoggerFactory.getLogger(this.getClass());

    public static final String SOME_CONSTANT = "my-group-id-constant";

    @Component
    class MyListener {

        @KafkaListener(topics="topic", groupId="#{ '${71291726.my.app.env:}' == '' ? T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT : T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT + '-' + '${71291726.my.app.env:}'}")
        void listen(String message, @Header(KafkaHeaders.GROUP_ID) String groupId) {
            log.info("Received message {} from group id {} ", message, groupId);
        }
    }
}

输出: 2022-02-28 14:26:14.733 INFO 18841 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$cf264156 : Received message My message from group id my-group-id-constant

如果我将 71291726.my.app.env = TEST 添加到 application.properties 文件:

2022-02-28 14:34:03.900 INFO 18870 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$e1a5933e : Received message My message from group id my-group-id-constant-TEST