使用 Spring EL 将属性中的可选后缀添加到 @KafkaListener 中的 consumerGroup
Using Spring EL to add optional postfix from properties to consumerGroup in @KafkaListener
我有一个简单的 spring 引导应用程序和 Kafka 消费者,看起来像
@KafkaListener(topics="topic", groupId="SOME_CONSTANT") {
....
}
我需要做的是添加可选的 spring boot 属性 (来自环境变量,但这并不重要)让我们说:
myapp.env: TEST
并且当该变量存在时,我应该自动将消费者组更新为
SOME_CONSTANT-TEST
我在玩SPEL
@KafkaListener(topics="topic", groupId="#{ '${myApp.env}' == null ? 'SOME_CONSTANT' : 'SOME_CONSTANT' + '-' + '${myApp.env}}'") {
....
}
但这似乎行不通:/有什么想法吗?
可以使用T
运算符读取常量的值,在没有环境变量的情况下使用冒号':':
@KafkaListener(topics="topic", groupId="#{ '${my.app.env:}' == '' ? T(com.mypackage.MyListener).SOME_CONSTANT : T(com.mypackage.MyListener).SOME_CONSTANT + '-' + '${my.app.env:}'}")
这是一个使用此解决方案的示例应用程序:
package org.spring.kafka.playground;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@SpringBootApplication
public class SO71291726 {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(SO71291726.class, args);
try {
Thread.sleep(10000);
}
catch (InterruptedException e) {
Thread.interrupted();
throw new RuntimeException("Interrupted");
}
KafkaOperations kafkaTemplate = context.getBean("kafkaTemplate", KafkaOperations.class);
kafkaTemplate.send("topic", "My message");
}
Logger log = LoggerFactory.getLogger(this.getClass());
public static final String SOME_CONSTANT = "my-group-id-constant";
@Component
class MyListener {
@KafkaListener(topics="topic", groupId="#{ '${71291726.my.app.env:}' == '' ? T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT : T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT + '-' + '${71291726.my.app.env:}'}")
void listen(String message, @Header(KafkaHeaders.GROUP_ID) String groupId) {
log.info("Received message {} from group id {} ", message, groupId);
}
}
}
输出:
2022-02-28 14:26:14.733 INFO 18841 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$cf264156 : Received message My message from group id my-group-id-constant
如果我将 71291726.my.app.env = TEST
添加到 application.properties
文件:
2022-02-28 14:34:03.900 INFO 18870 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$e1a5933e : Received message My message from group id my-group-id-constant-TEST
我有一个简单的 spring 引导应用程序和 Kafka 消费者,看起来像
@KafkaListener(topics="topic", groupId="SOME_CONSTANT") {
....
}
我需要做的是添加可选的 spring boot 属性 (来自环境变量,但这并不重要)让我们说:
myapp.env: TEST
并且当该变量存在时,我应该自动将消费者组更新为
SOME_CONSTANT-TEST
我在玩SPEL
@KafkaListener(topics="topic", groupId="#{ '${myApp.env}' == null ? 'SOME_CONSTANT' : 'SOME_CONSTANT' + '-' + '${myApp.env}}'") {
....
}
但这似乎行不通:/有什么想法吗?
可以使用T
运算符读取常量的值,在没有环境变量的情况下使用冒号':':
@KafkaListener(topics="topic", groupId="#{ '${my.app.env:}' == '' ? T(com.mypackage.MyListener).SOME_CONSTANT : T(com.mypackage.MyListener).SOME_CONSTANT + '-' + '${my.app.env:}'}")
这是一个使用此解决方案的示例应用程序:
package org.spring.kafka.playground;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@SpringBootApplication
public class SO71291726 {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(SO71291726.class, args);
try {
Thread.sleep(10000);
}
catch (InterruptedException e) {
Thread.interrupted();
throw new RuntimeException("Interrupted");
}
KafkaOperations kafkaTemplate = context.getBean("kafkaTemplate", KafkaOperations.class);
kafkaTemplate.send("topic", "My message");
}
Logger log = LoggerFactory.getLogger(this.getClass());
public static final String SOME_CONSTANT = "my-group-id-constant";
@Component
class MyListener {
@KafkaListener(topics="topic", groupId="#{ '${71291726.my.app.env:}' == '' ? T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT : T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT + '-' + '${71291726.my.app.env:}'}")
void listen(String message, @Header(KafkaHeaders.GROUP_ID) String groupId) {
log.info("Received message {} from group id {} ", message, groupId);
}
}
}
输出:
2022-02-28 14:26:14.733 INFO 18841 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$cf264156 : Received message My message from group id my-group-id-constant
如果我将 71291726.my.app.env = TEST
添加到 application.properties
文件:
2022-02-28 14:34:03.900 INFO 18870 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$e1a5933e : Received message My message from group id my-group-id-constant-TEST