使用 InboundChannelAdapter 动态创建 SftpInboundFileSynchronizingMessageSource 的多个 beans
Create multiple beans of SftpInboundFileSynchronizingMessageSource dynamically with InboundChannelAdapter
我正在使用 spring 入站通道适配器从 sftp 服务器轮询文件。应用程序需要从单个 sftp 服务器的多个目录进行轮询。由于入站通道适配器不允许轮询多个目录,因此我尝试创建多个具有不同值的相同类型的 bean。由于将来目录数量可能会增加,我想从应用程序属性中控制它并动态注册 bean。
我的代码-
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
beanFactory.registerSingleton("sftpSessionFactory", sftpSessionFactory(host, port, user, password));
beanFactory.registerSingleton("sftpInboundFileSynchronizer",
sftpInboundFileSynchronizer((SessionFactory) beanFactory.getBean("sftpSessionFactory")));
}
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory(String host, String port, String user, String password) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(Integer.parseInt(port));
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
private SftpInboundFileSynchronizer sftpInboundFileSynchronizer(SessionFactory sessionFactory) {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setPreserveTimestamp(true);
fileSynchronizer.setRemoteDirectory("/mydir/subdir);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.pdf"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "2000"))
public MessageSource<File> sftpMessageSource(String s) {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
(AbstractInboundFileSynchronizer<ChannelSftp.LsEntry>) applicationContext.getBean("sftpInboundFileSynchronizer"));
source.setLocalDirectory(new File("/dir/subdir"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<>());
source.setMaxFetchSize(Integer.parseInt(maxFetchSize));
source.setAutoCreateLocalDirectory(true);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return message -> {
LOGGER.info("Payload - {}", message.getPayload());
};
}
这段代码工作正常。但是如果我动态创建 sftpMessageSource,那么 @InboundChannelAdapter 注释将不起作用。请建议一种动态创建 sftpMessageSource
和 handler
bean 并添加相应注释的方法。
更新:
以下代码有效:
@PostConstruct
void init() {
int index = 0;
for (String directory : directories) {
index++;
int finalI = index;
IntegrationFlow flow = IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(directory)
.autoCreateLocalDirectory(true)
.localDirectory(new File("/" + directory))
.localFilter(new AcceptOnceFileListFilter<>())
.maxFetchSize(10)
.filter(new SftpSimplePatternFileListFilter("*.pdf"))
.deleteRemoteFiles(true),
e -> e.id("sftpInboundAdapter" + finalI)
.autoStartup(true)
.poller(Pollers.fixedDelay(2000)))
.handle(handler())
.get();
this.flowContext.registration(flow).register();
}
}
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(Integer.parseInt(port));
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
Java 中的注释是 static。您不能在运行时为创建的对象添加它们。此外,框架会在应用程序上下文启动时读取这些注释。因此,使用 Java 作为语言本身是不可能的。
您需要考虑在 Spring 集成中切换到 Java DSL,以便能够使用其“动态流”:https://docs.spring.io/spring-integration/docs/5.3.1.RELEASE/reference/html/dsl.html#java-dsl-runtime-flows。
但是,请首先多研究一下 Java 可以做什么,不能做什么。
我正在使用 spring 入站通道适配器从 sftp 服务器轮询文件。应用程序需要从单个 sftp 服务器的多个目录进行轮询。由于入站通道适配器不允许轮询多个目录,因此我尝试创建多个具有不同值的相同类型的 bean。由于将来目录数量可能会增加,我想从应用程序属性中控制它并动态注册 bean。
我的代码-
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
beanFactory.registerSingleton("sftpSessionFactory", sftpSessionFactory(host, port, user, password));
beanFactory.registerSingleton("sftpInboundFileSynchronizer",
sftpInboundFileSynchronizer((SessionFactory) beanFactory.getBean("sftpSessionFactory")));
}
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory(String host, String port, String user, String password) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(Integer.parseInt(port));
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
private SftpInboundFileSynchronizer sftpInboundFileSynchronizer(SessionFactory sessionFactory) {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setPreserveTimestamp(true);
fileSynchronizer.setRemoteDirectory("/mydir/subdir);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.pdf"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "2000"))
public MessageSource<File> sftpMessageSource(String s) {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
(AbstractInboundFileSynchronizer<ChannelSftp.LsEntry>) applicationContext.getBean("sftpInboundFileSynchronizer"));
source.setLocalDirectory(new File("/dir/subdir"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<>());
source.setMaxFetchSize(Integer.parseInt(maxFetchSize));
source.setAutoCreateLocalDirectory(true);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return message -> {
LOGGER.info("Payload - {}", message.getPayload());
};
}
这段代码工作正常。但是如果我动态创建 sftpMessageSource,那么 @InboundChannelAdapter 注释将不起作用。请建议一种动态创建 sftpMessageSource
和 handler
bean 并添加相应注释的方法。
更新:
以下代码有效:
@PostConstruct
void init() {
int index = 0;
for (String directory : directories) {
index++;
int finalI = index;
IntegrationFlow flow = IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(directory)
.autoCreateLocalDirectory(true)
.localDirectory(new File("/" + directory))
.localFilter(new AcceptOnceFileListFilter<>())
.maxFetchSize(10)
.filter(new SftpSimplePatternFileListFilter("*.pdf"))
.deleteRemoteFiles(true),
e -> e.id("sftpInboundAdapter" + finalI)
.autoStartup(true)
.poller(Pollers.fixedDelay(2000)))
.handle(handler())
.get();
this.flowContext.registration(flow).register();
}
}
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(Integer.parseInt(port));
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
Java 中的注释是 static。您不能在运行时为创建的对象添加它们。此外,框架会在应用程序上下文启动时读取这些注释。因此,使用 Java 作为语言本身是不可能的。
您需要考虑在 Spring 集成中切换到 Java DSL,以便能够使用其“动态流”:https://docs.spring.io/spring-integration/docs/5.3.1.RELEASE/reference/html/dsl.html#java-dsl-runtime-flows。
但是,请首先多研究一下 Java 可以做什么,不能做什么。