使用 InboundChannelAdapter 动态创建 SftpInboundFileSynchronizingMessageSource 的多个 beans

Create multiple beans of SftpInboundFileSynchronizingMessageSource dynamically with InboundChannelAdapter

我正在使用 spring 入站通道适配器从 sftp 服务器轮询文件。应用程序需要从单个 sftp 服务器的多个目录进行轮询。由于入站通道适配器不允许轮询多个目录,因此我尝试创建多个具有不同值的相同类型的 bean。由于将来目录数量可能会增加,我想从应用程序属性中控制它并动态注册 bean。

我的代码-

@Override
 public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

   beanFactory.registerSingleton("sftpSessionFactory", sftpSessionFactory(host, port, user, password));
   beanFactory.registerSingleton("sftpInboundFileSynchronizer",
       sftpInboundFileSynchronizer((SessionFactory) beanFactory.getBean("sftpSessionFactory")));
 }
  public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory(String host, String port, String user, String password) {

    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(host);
    factory.setPort(Integer.parseInt(port));
    factory.setUser(user);
    factory.setPassword(password);
    factory.setAllowUnknownKeys(true);

    return new CachingSessionFactory<>(factory);
  }
  private SftpInboundFileSynchronizer sftpInboundFileSynchronizer(SessionFactory sessionFactory) {

    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
    fileSynchronizer.setDeleteRemoteFiles(true);
    fileSynchronizer.setPreserveTimestamp(true);
    fileSynchronizer.setRemoteDirectory("/mydir/subdir);
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.pdf"));

    return fileSynchronizer;
  }
  @Bean
  @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "2000"))
  public MessageSource<File> sftpMessageSource(String s) {
    SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
        (AbstractInboundFileSynchronizer<ChannelSftp.LsEntry>) applicationContext.getBean("sftpInboundFileSynchronizer"));
    source.setLocalDirectory(new File("/dir/subdir"));
    source.setAutoCreateLocalDirectory(true);
    source.setLocalFilter(new AcceptOnceFileListFilter<>());
    source.setMaxFetchSize(Integer.parseInt(maxFetchSize));
    source.setAutoCreateLocalDirectory(true);

    return source;
  }

  @Bean
  @ServiceActivator(inputChannel = "sftpChannel")
  public MessageHandler handler() {
    return message -> {
      LOGGER.info("Payload - {}", message.getPayload());
    };
  }

这段代码工作正常。但是如果我动态创建 sftpMessageSource,那么 @InboundChannelAdapter 注释将不起作用。请建议一种动态创建 sftpMessageSourcehandler bean 并添加相应注释的方法。

更新:

以下代码有效:

  @PostConstruct
  void init() {
    int index = 0;
    for (String directory : directories) {
      index++;
      int finalI = index;
      IntegrationFlow flow = IntegrationFlows
          .from(Sftp.inboundAdapter(sftpSessionFactory())
                  .preserveTimestamp(true)
                  .remoteDirectory(directory)
                  .autoCreateLocalDirectory(true)
                  .localDirectory(new File("/" + directory))
                  .localFilter(new AcceptOnceFileListFilter<>())
                  .maxFetchSize(10)
                  .filter(new SftpSimplePatternFileListFilter("*.pdf"))
                  .deleteRemoteFiles(true),
              e -> e.id("sftpInboundAdapter" + finalI)
                  .autoStartup(true)
                  .poller(Pollers.fixedDelay(2000)))
          .handle(handler())
          .get();

      this.flowContext.registration(flow).register();
    }
  }
  @Bean
  public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {

    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(host);
    factory.setPort(Integer.parseInt(port));
    factory.setUser(user);
    factory.setPassword(password);
    factory.setAllowUnknownKeys(true);

    return new CachingSessionFactory<>(factory);
  }

Java 中的注释是 static。您不能在运行时为创建的对象添加它们。此外,框架会在应用程序上下文启动时读取这些注释。因此,使用 Java 作为语言本身是不可能的。

您需要考虑在 Spring 集成中切换到 Java DSL,以便能够使用其“动态流”:https://docs.spring.io/spring-integration/docs/5.3.1.RELEASE/reference/html/dsl.html#java-dsl-runtime-flows

但是,请首先多研究一下 Java 可以做什么,不能做什么。