启用@KafkaListener 以从 application.yml 文件中获取可变主题名称
Enabling @KafkaListener to take in variable topic names from application.yml file
我试图将多个主题加载到单个 @KafkaListener
但我 运行 遇到了麻烦,因为我认为它正在寻找一个常量值,但正在初始化 topics
变量从 application.yml
文件导致一些问题,我想知道是否有人可以帮助我解决这个问题,或者指导我如何将多个 Kafka 主题加载到单个 KafkaListener 中。
我可以在同一个 @KafkaListener
中收听多个主题,方法是将它们传递到逗号分隔的对象中,如下所示:
@KafkaListener(topics = {
"flight-events",
"flight-time-events",
"service-events",
"flight-delay-events"
})
我知道我可以用逗号分隔的值来表示主题,但我希望能够通过配置文件添加主题,而不是更改代码库中的代码。
我认为可能存在问题,因为@KafkaListener 需要接受一个常量值,而我无法将注解定义为常量,有什么办法可以解决这个问题吗?
KafkaWebSocketConnector.java
@Component
public class KafkaWebSocketConnector
{
@Value("${spring.kafka.topics}")
private String[] topics;
@KafkaListener(topics = topics)
public void listen(ConsumerRecord<?, Map<String, String>> message)
{
log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
String dest = "/" + message.topic();
log.info("destination = {}", dest);
log.info("msg: {}", message);
messageTemplate.convertAndSend(dest, message.value());
}
}
application.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: kafka-websocket-connector
topics: flight-events,
flight-time-events,
canceled-events,
pax-events,
flight-delay-events
@Gary Russell 在此 GitHub 问题中提供的答案:
https://github.com/spring-projects/spring-kafka/issues/361
您可以使用 SpEL 表达式; EnableKafkaIntegrationTests 中有一个示例...
@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
以我为例"#{'${spring.kafka.topics}'.split(',')}"
为了回答上述问题,我能够实现上述代码(由 Gary Russell 提供)。
我试图将多个主题加载到单个 @KafkaListener
但我 运行 遇到了麻烦,因为我认为它正在寻找一个常量值,但正在初始化 topics
变量从 application.yml
文件导致一些问题,我想知道是否有人可以帮助我解决这个问题,或者指导我如何将多个 Kafka 主题加载到单个 KafkaListener 中。
我可以在同一个 @KafkaListener
中收听多个主题,方法是将它们传递到逗号分隔的对象中,如下所示:
@KafkaListener(topics = {
"flight-events",
"flight-time-events",
"service-events",
"flight-delay-events"
})
我知道我可以用逗号分隔的值来表示主题,但我希望能够通过配置文件添加主题,而不是更改代码库中的代码。
我认为可能存在问题,因为@KafkaListener 需要接受一个常量值,而我无法将注解定义为常量,有什么办法可以解决这个问题吗?
KafkaWebSocketConnector.java
@Component
public class KafkaWebSocketConnector
{
@Value("${spring.kafka.topics}")
private String[] topics;
@KafkaListener(topics = topics)
public void listen(ConsumerRecord<?, Map<String, String>> message)
{
log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
String dest = "/" + message.topic();
log.info("destination = {}", dest);
log.info("msg: {}", message);
messageTemplate.convertAndSend(dest, message.value());
}
}
application.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: kafka-websocket-connector
topics: flight-events,
flight-time-events,
canceled-events,
pax-events,
flight-delay-events
@Gary Russell 在此 GitHub 问题中提供的答案:
https://github.com/spring-projects/spring-kafka/issues/361
您可以使用 SpEL 表达式; EnableKafkaIntegrationTests 中有一个示例...
@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
以我为例"#{'${spring.kafka.topics}'.split(',')}"
为了回答上述问题,我能够实现上述代码(由 Gary Russell 提供)。