带有事件监听器的 BeanFactory
BeanFactory with Event listener
我正在使用 ConfigurableListableBeanFactory 和 registerSingleton 方法动态创建 bean,之后我想使用 EventListener 侦听事件。
在示例中,Kafka Listener 正确接收到消息,但没有触发 EventListener,这是因为 BeanFactory 不支持 ApplicationEvents,那么如何将 EventListener 添加到 BeanFactory?
此致!
@PostConstruct
public void setup() {
final ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
kafkaConfiguration.getTopics().keySet().forEach(key -> {
Topic topic = kafkaConfiguration.getTopics().get(key);
DefaultKafkaConsumerFactory<String, KafkaEntity> defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(topic.getClazz()));
beanFactory.registerSingleton(key, defaultKafkaConsumerFactory);
ConcurrentKafkaListenerContainerFactory<String, KafkaEntity> concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(defaultKafkaConsumerFactory);
concurrentKafkaListenerContainerFactory.setConcurrency(topic.getConcurrency());
concurrentKafkaListenerContainerFactory.setAutoStartup(topic.isAutoStart());
concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(kafkaConfiguration.getPollTimeout());
concurrentKafkaListenerContainerFactory.getContainerProperties().setIdleEventInterval(1000L);
beanFactory.registerSingleton(CONTAINER_FACTORY_NAME + key, concurrentKafkaListenerContainerFactory);
}
@KafkaListener(id = "kfktest", topics = "data_common-apibridge-service.board", idIsGroup = false,
containerFactory = KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME + "BOARD")
public void listen(@Payload(required = false) BoardMessage message,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// TODO message handler
}
@EventListener(condition = "event.listenerId.startsWith('kfktest-')")
public void eventHandler(ListenerContainerIdleEvent event) {
log.info(event.getListenerId());
}
经过一些阅读我找到了如何做到这一点,我使用 BeanFactoryPostProcessor 动态定义我在配置服务器中拥有的所有 bean,例如:
public static BeanFactoryPostProcessor beanFactoryPostProcessor(@Value("${kafka.topicNames}") String topicNames) {
之后,我使用@Postconstruct 在另一个@Configuration 中完成了Bean,以便已经实例化了一些依赖项。
@Bean
public static BeanFactoryPostProcessor beanFactoryPostProcessor(@Value("${kafka.topicNames}") String topicNames) {
return beanFactoryPostProcessor -> {
BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) beanFactoryPostProcessor;
List<String> topics = Arrays.asList(topicNames.split(Pattern.quote(",")));
topics.forEach(key -> {
beanDefinitionRegistry.registerBeanDefinition(KafkaConsumerConfiguration.DEFAULT_KAFKA_CONSUMER_FACTORY_NAME + key,
BeanDefinitionBuilder.genericBeanDefinition(KafkaConsumerFactory.class)
.getBeanDefinition());
beanDefinitionRegistry.registerBeanDefinition(KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME + key,
BeanDefinitionBuilder.genericBeanDefinition(ConcurrentKafkaListenerContainerFactory.class)
.getBeanDefinition());
});
log.info("Topics bean defined {}", topics.size());
};
}
此致!
我正在使用 ConfigurableListableBeanFactory 和 registerSingleton 方法动态创建 bean,之后我想使用 EventListener 侦听事件。 在示例中,Kafka Listener 正确接收到消息,但没有触发 EventListener,这是因为 BeanFactory 不支持 ApplicationEvents,那么如何将 EventListener 添加到 BeanFactory?
此致!
@PostConstruct
public void setup() {
final ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
kafkaConfiguration.getTopics().keySet().forEach(key -> {
Topic topic = kafkaConfiguration.getTopics().get(key);
DefaultKafkaConsumerFactory<String, KafkaEntity> defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(topic.getClazz()));
beanFactory.registerSingleton(key, defaultKafkaConsumerFactory);
ConcurrentKafkaListenerContainerFactory<String, KafkaEntity> concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(defaultKafkaConsumerFactory);
concurrentKafkaListenerContainerFactory.setConcurrency(topic.getConcurrency());
concurrentKafkaListenerContainerFactory.setAutoStartup(topic.isAutoStart());
concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(kafkaConfiguration.getPollTimeout());
concurrentKafkaListenerContainerFactory.getContainerProperties().setIdleEventInterval(1000L);
beanFactory.registerSingleton(CONTAINER_FACTORY_NAME + key, concurrentKafkaListenerContainerFactory);
}
@KafkaListener(id = "kfktest", topics = "data_common-apibridge-service.board", idIsGroup = false,
containerFactory = KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME + "BOARD")
public void listen(@Payload(required = false) BoardMessage message,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// TODO message handler
}
@EventListener(condition = "event.listenerId.startsWith('kfktest-')")
public void eventHandler(ListenerContainerIdleEvent event) {
log.info(event.getListenerId());
}
经过一些阅读我找到了如何做到这一点,我使用 BeanFactoryPostProcessor 动态定义我在配置服务器中拥有的所有 bean,例如:
public static BeanFactoryPostProcessor beanFactoryPostProcessor(@Value("${kafka.topicNames}") String topicNames) {
之后,我使用@Postconstruct 在另一个@Configuration 中完成了Bean,以便已经实例化了一些依赖项。
@Bean
public static BeanFactoryPostProcessor beanFactoryPostProcessor(@Value("${kafka.topicNames}") String topicNames) {
return beanFactoryPostProcessor -> {
BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) beanFactoryPostProcessor;
List<String> topics = Arrays.asList(topicNames.split(Pattern.quote(",")));
topics.forEach(key -> {
beanDefinitionRegistry.registerBeanDefinition(KafkaConsumerConfiguration.DEFAULT_KAFKA_CONSUMER_FACTORY_NAME + key,
BeanDefinitionBuilder.genericBeanDefinition(KafkaConsumerFactory.class)
.getBeanDefinition());
beanDefinitionRegistry.registerBeanDefinition(KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME + key,
BeanDefinitionBuilder.genericBeanDefinition(ConcurrentKafkaListenerContainerFactory.class)
.getBeanDefinition());
});
log.info("Topics bean defined {}", topics.size());
};
}
此致!