Spring 表达式语言问题
Spring Expression Language issue
我有以下class。我已经在控制台中验证,在解析 Kafka 侦听器中的主题占位符值之前调用了此 class 的构造函数(在创建 bean 期间):
public class MsgReceiver<MSG> extends AbstractMsgReceiver<MSG> implements
MessageReceiver<MSG> {
@SuppressWarnings("unused")
private String topic;
public MsgReceiver(String topic, MessageHandler<MSG> handler) {
super(handler);
this.topic = topic;
}
@KafkaListener(topics = "${my.messenger.kafka.topics.#{${topic}}.value}", groupId = "${my.messenger.kafka.topics.#{${topic}}.groupId}")
public void receiveMessage(@Headers Map<String, Object> headers, @Payload MSG payload) {
System.out.println("Received "+payload);
super.receiveMessage(headers, payload);
}
}
我的 application.yml 如下:
my:
messenger:
kafka:
address: localhost:9092
topics:
topic_1:
value: my_topic
groupId: 1
在创建 bean 期间,我传递了“topic_1”,我希望它应该在 Kafka 侦听器主题占位符中动态使用。我按照代码本身所示进行了尝试,但它不起作用。请建议如何做到这一点。
在评估 SpEL 之前解析占位符;您不能使用 SpEL 动态构建占位符名称。另外,您不能像那样引用字段;您必须通过 bean 名称(和 public getter)间接执行此操作。
因此,要执行您想要的操作,您必须添加 getter 并在使用 SpEL 构建 属性 名称后从环境中动态获取 属性。
有一个特殊的标记 __listener
允许您引用当前 bean。
把它们放在一起...
@SpringBootApplication
public class So63056065Application {
public static void main(String[] args) {
SpringApplication.run(So63056065Application.class, args);
}
@Bean
public MyReceiver receiver() {
return new MyReceiver("topic_1");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("my_topic").partitions(1).replicas(1).build();
}
}
class MyReceiver {
private final String topic;
public MyReceiver(String topic) {
this.topic = topic;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(topics = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.value')}",
groupId = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.groupId')}")
public void listen(String in) {
System.out.println(in);
}
}
结果...
2020-07-23 12:13:44.932 INFO 39561 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 1
group.instance.id = null
...
和
1: partitions assigned: [my_topic-0]
我有以下class。我已经在控制台中验证,在解析 Kafka 侦听器中的主题占位符值之前调用了此 class 的构造函数(在创建 bean 期间):
public class MsgReceiver<MSG> extends AbstractMsgReceiver<MSG> implements
MessageReceiver<MSG> {
@SuppressWarnings("unused")
private String topic;
public MsgReceiver(String topic, MessageHandler<MSG> handler) {
super(handler);
this.topic = topic;
}
@KafkaListener(topics = "${my.messenger.kafka.topics.#{${topic}}.value}", groupId = "${my.messenger.kafka.topics.#{${topic}}.groupId}")
public void receiveMessage(@Headers Map<String, Object> headers, @Payload MSG payload) {
System.out.println("Received "+payload);
super.receiveMessage(headers, payload);
}
}
我的 application.yml 如下:
my:
messenger:
kafka:
address: localhost:9092
topics:
topic_1:
value: my_topic
groupId: 1
在创建 bean 期间,我传递了“topic_1”,我希望它应该在 Kafka 侦听器主题占位符中动态使用。我按照代码本身所示进行了尝试,但它不起作用。请建议如何做到这一点。
在评估 SpEL 之前解析占位符;您不能使用 SpEL 动态构建占位符名称。另外,您不能像那样引用字段;您必须通过 bean 名称(和 public getter)间接执行此操作。
因此,要执行您想要的操作,您必须添加 getter 并在使用 SpEL 构建 属性 名称后从环境中动态获取 属性。
有一个特殊的标记 __listener
允许您引用当前 bean。
把它们放在一起...
@SpringBootApplication
public class So63056065Application {
public static void main(String[] args) {
SpringApplication.run(So63056065Application.class, args);
}
@Bean
public MyReceiver receiver() {
return new MyReceiver("topic_1");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("my_topic").partitions(1).replicas(1).build();
}
}
class MyReceiver {
private final String topic;
public MyReceiver(String topic) {
this.topic = topic;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(topics = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.value')}",
groupId = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.groupId')}")
public void listen(String in) {
System.out.println(in);
}
}
结果...
2020-07-23 12:13:44.932 INFO 39561 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 1
group.instance.id = null
...
和
1: partitions assigned: [my_topic-0]