带有事件监听器的 BeanFactory

BeanFactory with Event listener

我正在使用 ConfigurableListableBeanFactory 和 registerSingleton 方法动态创建 bean,之后我想使用 EventListener 侦听事件。 在示例中,Kafka Listener 正确接收到消息,但没有触发 EventListener,这是因为 BeanFactory 不支持 ApplicationEvents,那么如何将 EventListener 添加到 BeanFactory?

此致!

https://docs.spring.io/spring-framework/docs/4.3.9.RELEASE/spring-framework-reference/html/beans.html#beans-definition

    @PostConstruct
         public void setup() {
                final ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();

                kafkaConfiguration.getTopics().keySet().forEach(key -> {
                    Topic topic = kafkaConfiguration.getTopics().get(key);

                    DefaultKafkaConsumerFactory<String, KafkaEntity> defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
                            consumerConfigs(),
                            new StringDeserializer(),
                            new JsonDeserializer<>(topic.getClazz()));

                    beanFactory.registerSingleton(key, defaultKafkaConsumerFactory);

                    ConcurrentKafkaListenerContainerFactory<String, KafkaEntity> concurrentKafkaListenerContainerFactory =
                        new ConcurrentKafkaListenerContainerFactory<>();
                    concurrentKafkaListenerContainerFactory.setConsumerFactory(defaultKafkaConsumerFactory);
                    concurrentKafkaListenerContainerFactory.setConcurrency(topic.getConcurrency());
                    concurrentKafkaListenerContainerFactory.setAutoStartup(topic.isAutoStart());
concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(kafkaConfiguration.getPollTimeout());
concurrentKafkaListenerContainerFactory.getContainerProperties().setIdleEventInterval(1000L);
      beanFactory.registerSingleton(CONTAINER_FACTORY_NAME + key, concurrentKafkaListenerContainerFactory);
        }

        @KafkaListener(id = "kfktest", topics = "data_common-apibridge-service.board", idIsGroup = false,
                containerFactory = KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME + "BOARD")
        public void listen(@Payload(required = false) BoardMessage message,
                           @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
            // TODO message handler
        }

        @EventListener(condition = "event.listenerId.startsWith('kfktest-')")
        public void eventHandler(ListenerContainerIdleEvent event) {
                log.info(event.getListenerId());
        }

经过一些阅读我找到了如何做到这一点,我使用 BeanFactoryPostProcessor 动态定义我在配置服务器中拥有的所有 bean,例如:

public static BeanFactoryPostProcessor beanFactoryPostProcessor(@Value("${kafka.topicNames}") String topicNames) {

之后,我使用@Postconstruct 在另一个@Configuration 中完成了Bean,以便已经实例化了一些依赖项。

@Bean
    public static BeanFactoryPostProcessor beanFactoryPostProcessor(@Value("${kafka.topicNames}") String topicNames) {
        return beanFactoryPostProcessor -> {

            BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) beanFactoryPostProcessor;

            List<String> topics = Arrays.asList(topicNames.split(Pattern.quote(",")));
            topics.forEach(key -> {
                beanDefinitionRegistry.registerBeanDefinition(KafkaConsumerConfiguration.DEFAULT_KAFKA_CONSUMER_FACTORY_NAME + key,
                    BeanDefinitionBuilder.genericBeanDefinition(KafkaConsumerFactory.class)
                        .getBeanDefinition());

                beanDefinitionRegistry.registerBeanDefinition(KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME + key,
                    BeanDefinitionBuilder.genericBeanDefinition(ConcurrentKafkaListenerContainerFactory.class)
                        .getBeanDefinition());
            });
            log.info("Topics bean defined {}", topics.size());
        };
    }

此致!