如何通过代码在 KafkaListener 中设置主题名称?
How to set topic name in KafkaListener by code?
我正在为 kafka 主题编写侦听器并使用 @kafkaListener 注释来执行相同的操作。到现在为止,我对主题名称 (topic1) 进行了硬编码并且它正在运行 fine.Here 是一个有效的代码 :-
@Component
public class KafkaConsumer {
@Autowired
private KafkaProperties kafkaProps;
@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;
@KafkaListener(topics = "topic1", containerFactory = "createPokafkaListenerContainerFactory")
public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
throws JsonProcessingException {
LOG.info("Received message for po create from kafka topic {} is {}",
kafkaProps.getOmsCreateTopicName(), objectMapper
.writerWithDefaultPrettyPrinter().writeValueAsString(po));
ack.acknowledge();
}
}
现在,当我尝试更改代码以从代码中获取主题名称时,它不起作用。在堆栈溢出(How to pass dynamic topic name to @kafkalistener(topics from environment variable)中关注此页面后,我试图从代码中获取值的代码是:-
@Component
public class KafkaConsumer {
@Autowired
private KafkaProperties kafkaProps;
public KafkaProperties getKafkaProps() {
return kafkaProps;
}
@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;
@KafkaListener(topics = "#{__listener.kafkaProps.getOmsCreateTopicName()}", containerFactory = "omsCreatePokafkaListenerContainerFactory")
public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
throws JsonProcessingException {
LOG.info("Received message for po create from kafka topic {} is {}",
kafkaProps.getOmsCreateTopicName(), objectMapper
.writerWithDefaultPrettyPrinter().writeValueAsString(po));
ack.acknowledge();
}
}
当我尝试连接服务器时,它抛出错误:-
bean初始化失败;嵌套异常是 org.springframework.beans.factory.BeanExpressionException:表达式解析失败;嵌套异常是 org.springframework.expression.spel.SpelEvaluationException: EL1008E: 属性 或在 'org.springframework.beans.factory.config.BeanExpressionContext' 类型的对象上找不到字段 '__listener' - 可能不是 public?
有人可以帮我看看我做错了什么吗?
1.1.x不再支持;如果你不能升级到 Spring Framework 4.3,你应该升级到 1.3.10,这是与 Spring 4.3 一起使用的最新版本;它有一个比 1.1.x 更简单可靠的线程模型,感谢 KIP-62.
虽然不能将 __listener
与 1.3.x 一起使用,但可以使用 SpEL 表达式直接从 KafkaProperties
bean 中获取主题:
@KafkaListener(topics = "#{kafkaProps.omsCreateTopicName}" ...)
在 Consumer class 中,您需要进行以下更改:
@Autowired
public KafkaProperties kafkaProps;
@KafkaListener(topics = "#{kafkaConsumer.kafkaProps.getOmsCreateTopicName()}"
对我有用。
我正在为 kafka 主题编写侦听器并使用 @kafkaListener 注释来执行相同的操作。到现在为止,我对主题名称 (topic1) 进行了硬编码并且它正在运行 fine.Here 是一个有效的代码 :-
@Component
public class KafkaConsumer {
@Autowired
private KafkaProperties kafkaProps;
@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;
@KafkaListener(topics = "topic1", containerFactory = "createPokafkaListenerContainerFactory")
public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
throws JsonProcessingException {
LOG.info("Received message for po create from kafka topic {} is {}",
kafkaProps.getOmsCreateTopicName(), objectMapper
.writerWithDefaultPrettyPrinter().writeValueAsString(po));
ack.acknowledge();
}
}
现在,当我尝试更改代码以从代码中获取主题名称时,它不起作用。在堆栈溢出(How to pass dynamic topic name to @kafkalistener(topics from environment variable)中关注此页面后,我试图从代码中获取值的代码是:-
@Component
public class KafkaConsumer {
@Autowired
private KafkaProperties kafkaProps;
public KafkaProperties getKafkaProps() {
return kafkaProps;
}
@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;
@KafkaListener(topics = "#{__listener.kafkaProps.getOmsCreateTopicName()}", containerFactory = "omsCreatePokafkaListenerContainerFactory")
public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
throws JsonProcessingException {
LOG.info("Received message for po create from kafka topic {} is {}",
kafkaProps.getOmsCreateTopicName(), objectMapper
.writerWithDefaultPrettyPrinter().writeValueAsString(po));
ack.acknowledge();
}
}
当我尝试连接服务器时,它抛出错误:-
bean初始化失败;嵌套异常是 org.springframework.beans.factory.BeanExpressionException:表达式解析失败;嵌套异常是 org.springframework.expression.spel.SpelEvaluationException: EL1008E: 属性 或在 'org.springframework.beans.factory.config.BeanExpressionContext' 类型的对象上找不到字段 '__listener' - 可能不是 public?
有人可以帮我看看我做错了什么吗?
1.1.x不再支持;如果你不能升级到 Spring Framework 4.3,你应该升级到 1.3.10,这是与 Spring 4.3 一起使用的最新版本;它有一个比 1.1.x 更简单可靠的线程模型,感谢 KIP-62.
虽然不能将 __listener
与 1.3.x 一起使用,但可以使用 SpEL 表达式直接从 KafkaProperties
bean 中获取主题:
@KafkaListener(topics = "#{kafkaProps.omsCreateTopicName}" ...)
在 Consumer class 中,您需要进行以下更改:
@Autowired
public KafkaProperties kafkaProps;
@KafkaListener(topics = "#{kafkaConsumer.kafkaProps.getOmsCreateTopicName()}"
对我有用。