如何在基于SftpRemoteFileTemplate的SFTP场景中集成一个MessageHandler?
How to integrate a MessageHandler into the SFTP scenario based on SftpRemoteFileTemplate?
我已经基于 Spring 的集成包中的 SftpRemoteFileTemplate 实现了从 SFTP 服务器获取文件、将文件放入和删除文件的服务。
此处 sftpGetPayload 从 SFTP 服务器获取文件并传送其内容。
到目前为止,这是我的代码:
public String sftpGetPayload(String sessionId,
String host, int port, String user, String password,
String remoteDir, String remoteFilename, boolean remoteRemove) {
LOG.info("sftpGetPayload sessionId={}", sessionId);
LOG.debug("sftpGetPayLoad host={}, port={}, user={}", host, port, user);
LOG.debug("sftpGetPayload remoteDir={}, remoteFilename={}, remoteRemove={}",
remoteDir, remoteFilename, remoteRemove);
final AtomicReference<String> refPayload = new AtomicReference<>();
SftpRemoteFileTemplate template = getSftpRemoteFileTemplate(host, port,
user, password, remoteDir, remoteFilename);
template.get(remoteDir + "/" + remoteFilename,
is -> refPayload.set(getAsString(is)));
LOG.info("sftpGetToFile {} read.", remoteDir + "/" + remoteFilename);
deleteRemoteFile(template, remoteDir, remoteFilename, remoteRemove);
return refPayload.get();
}
private SftpRemoteFileTemplate getSftpRemoteFileTemplate(String host, int port,
String user, String password, String remoteDir, String remoteFilename) {
SftpRemoteFileTemplate template =
new SftpRemoteFileTemplate(sftpSessionFactory(host, port, user, password));
template.setFileNameExpression(
new LiteralExpression(remoteDir + "/" + remoteFilename));
template.setRemoteDirectoryExpression(new LiteralExpression(remoteDir));
//template.afterPropertiesSet();
return template;
}
private void deleteRemoteFile(SftpRemoteFileTemplate template,
String remoteDir, String remoteFilename, boolean remoteRemove) {
LOG.debug("deleteRemoteFile remoteRemove={}", remoteRemove);
if (remoteRemove) {
template.remove(remoteDir + "/" + remoteFilename);
LOG.info("sftpGetToFile {} removed.", remoteDir + "/" + remoteFilename);
}
}
所有这些 GET 操作都是主动操作,这意味着要获取的文件被认为已经存在。我想要一种轮询过程,一旦在 SFTP 服务器上收到文件,它就会调用我的有效负载消耗方法。
我找到了另一个基于Spring beans的实现,配置为Spring Integration Dsl,它声明了一个SftpSessionFactory,一个 SftpInboundFileSynchronizer、一个 SftpMessageSource 和一个 MessageHandler,它轮询 SFTP 站点以接收文件并自动启动消息处理程序进一步处理。
这段代码如下:
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(myHost);
factory.setPort(myPort);
factory.setUser(myUser);
factory.setPassword(myPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(myRemotePath);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter(myFileFilter));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
sftpInboundFileSynchronizer());
source.setLocalDirectory(myLocalDirectory);
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
如何将这个@Poller/MessageHandler/@ServiceActivator 概念包含到我上面的实现中?或者有没有办法在基于模板的实现中实现这个特性?
场景可能如下:
我有一个 Spring 引导应用程序,其中有几个 类 代表任务。其中一些任务是通过 Spring @Scheduled 注释和 CRON 规范自动调用的,其他任务则不是。
@Scheduled(cron = "${task.to.start.automatically.frequency}")
public void runAsTask() {
...
}
第一个任务将从 ist @Sheduled 规范开始,从 SFTP 服务器获取文件并处理它。它将使用自己的通道(host1、port1、user1、password1、remoteDir1、remoteFile1)执行此操作。
第二个任务也将由调度程序 运行 生成一些东西放到 SFTP 服务器上。它将使用自己的通道(host2、port2、user2、password2、remoteDir2、remoteFile2)执行此操作。很可能 host2 = host1 和 port2 = port1,但这不是必须的。
第三个任务也将由调度程序 运行 生成一些东西放到 SFTP 服务器上。它将使用与 task1 相同的通道来执行此操作,但此任务是生产者(而不是像 task1 那样的消费者)并将写入除 task1 之外的另一个文件(host1、port1、user1、password1、remoteDir3、remoteFile3)。
任务四没有 @Scheduled 注释,因为它应该意识到何时从第三方接收到它必须处理的文件,因此可以在其通道(host4、port4、user4、password4、remoteDir4、remoteFile4)上获取其内容以进行处理它。
我已经阅读了整个集成内容,但是很难针对这个用例进行转换,无论是从 XML 配置方案到带注释的 Java 配置,还是通过静态 Spring 在 运行 时间将 bean 方法转换为动态方法。
我了解使用 IntegrationFlow 来注册人工制品,task1 的入站适配器,task2 的出站适配器,task3 的入站适配器与 task1 的会话工厂相同(在其他任何地方注册),以及 - 最后但尤其重要的是 - 具有 task4 轮询器功能的入站适配器。
或者它们都应该是具有命令功能的网关吗?或者我应该注册 SftpRemoteFileTemplate 吗?
要定义频道我有:
public class TransferChannel {
private String host;
private int port;
private String user;
private String password;
/* getters, setters, hash, equals, and toString */
}
要将所有 SFTP 设置放在一起,我有:
public class TransferContext {
private boolean enabled;
private TransferChannel channel;
private String remoteDir;
private String remoteFilename;
private boolean remoteRemove;
private String remoteFilenameFilter;
private String localDir;
/* getters, setters, hash, equals, and toString */
}
作为 SFTP 处理的核心,每个作业都会注入一种 DynamicSftpAdapter:
@Scheduled(cron = "${task.to.start.automatically.frequency}")
public void runAsTask() {
@Autowired
DynamicSftpAdapter sftp;
...
sftp.connect("Task1", context);
File f = sftp.getFile("Task1", "remoteDir", "remoteFile");
/* process file content */
sftp.removeFile("Task1", "remoteDir", "remoteFile");
sftp.disconnect("Task1", context);
}
DynamicSftpAdapter 还只是一个片段:
@Component
public class DynamicSftpAdapter {
private static final Logger LOG = LoggerFactory.getLogger(DynamicTcpServer.class);
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ApplicationContext appContext;
private final Map<TransferChannel, IntegrationFlowRegistration> registrations = new HashMap<>();
private final Map<String, TransferContext> sessions = new ConcurrentHashMap<>();
@Override
public void connect(String sessionId, TransferContext context) {
if (this.registrations.containsKey(context.getChannel())) {
LOG.debug("connect, channel exists for {}", sessionId);
}
else {
// register the required SFTP Outbound Adapter
TransferChannel channel = context.getChannel();
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(cashedSftpSessionFactory(
channel.getHost(), channel.getPort(),
channel.getUser(), channel.getPassword())));
this.registrations.put(channel, flowContext.registration(flow).register());
this.sessions.put(sessionId, context);
LOG.info("sftp session {} for {} started", sessionId, context);
}
}
private DefaultSftpSessionFactory sftpSessionFactory(String host, int port, String user, String password) {
LOG.debug("sftpSessionFactory");
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return factory;
}
private CachingSessionFactory<LsEntry> cashedSftpSessionFactory(String host, int port, String user, String password) {
LOG.debug("cashedSftpSessionFactory");
CachingSessionFactory<LsEntry> cashedSessionFactory =
new CachingSessionFactory<LsEntry>(
sftpSessionFactory(host, port, user, password));
return cashedSessionFactory;
}
@Override
public void sftpGetFile(String sessionId, String remoteDir, String remoteFilename) {
TransferContext context = sessions.get(sessionId);
if (context == null)
throw new IllegalStateException("Session not established, sessionId " + sessionId);
IntegrationFlowRegistration register = registrations.get(context.getChannel());
if (register != null) {
try {
LOG.debug("sftpGetFile get file {}", remoteDir + "/" + remoteFilename);
register.getMessagingTemplate().send(
MessageBuilder.withPayload(msg)
.setHeader(...).build());
}
catch (Exception e) {
appContext.getBean(context, DefaultSftpSessionFactory.class)
.close();
}
}
}
@Override
public void disconnect(String sessionId, TransferContext context) {
IntegrationFlowRegistration registration = this.registrations.remove(context.getChannel());
if (registration != null) {
registration.destroy();
}
LOG.info("sftp session for {} finished", context);
}
}
我不知道如何启动 SFTP 命令。当使用 OutboundGateway 并且必须立即指定 SFTP 命令(如 GET)时,我也没有得到,那么整个 SFTP 处理是否会在一个方法中,指定出站网关工厂并使用 get() 获取实例并可能调用消息 .get() 以任何方式。
显然我需要帮助。
首先,如果您已经在使用 Spring 集成通道适配器,则可能没有理由直接使用像 RemoteFileTemplate
这样的低级 API。
其次存在技术差异:SftpInboundFileSynchronizingMessageSource
将生成本地文件 - 远程文件的完整副本。因此,当我们在下游处理您的 SftpRemoteFileTemplate
逻辑时,它不会很好地工作,因为我们已经带来了一个本地文件 (java.io.File
),而不是远程文件表示的实体。
即使您在 sftpGetPayload()
中的逻辑看起来不像需要这样一个单独的方法那么复杂和自定义,最好将 SftpRemoteFileTemplate
作为单例并共享它当您使用同一个 SFTP 服务器时,不同组件之间。它只是无状态的直接 Spring 模板模式实现。
如果您仍然坚持使用上述集成流程中的方法,您应该考虑为该 @ServiceActivator(inputChannel = "sftpChannel")
调用一个 POJO 方法。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#annotations.
您还可能会发现 SFTP 出站网关是对您的用例有用的组件。它有一些常见的场景实现:https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-outbound-gateway
我已经基于 Spring 的集成包中的 SftpRemoteFileTemplate 实现了从 SFTP 服务器获取文件、将文件放入和删除文件的服务。
此处 sftpGetPayload 从 SFTP 服务器获取文件并传送其内容。
到目前为止,这是我的代码:
public String sftpGetPayload(String sessionId,
String host, int port, String user, String password,
String remoteDir, String remoteFilename, boolean remoteRemove) {
LOG.info("sftpGetPayload sessionId={}", sessionId);
LOG.debug("sftpGetPayLoad host={}, port={}, user={}", host, port, user);
LOG.debug("sftpGetPayload remoteDir={}, remoteFilename={}, remoteRemove={}",
remoteDir, remoteFilename, remoteRemove);
final AtomicReference<String> refPayload = new AtomicReference<>();
SftpRemoteFileTemplate template = getSftpRemoteFileTemplate(host, port,
user, password, remoteDir, remoteFilename);
template.get(remoteDir + "/" + remoteFilename,
is -> refPayload.set(getAsString(is)));
LOG.info("sftpGetToFile {} read.", remoteDir + "/" + remoteFilename);
deleteRemoteFile(template, remoteDir, remoteFilename, remoteRemove);
return refPayload.get();
}
private SftpRemoteFileTemplate getSftpRemoteFileTemplate(String host, int port,
String user, String password, String remoteDir, String remoteFilename) {
SftpRemoteFileTemplate template =
new SftpRemoteFileTemplate(sftpSessionFactory(host, port, user, password));
template.setFileNameExpression(
new LiteralExpression(remoteDir + "/" + remoteFilename));
template.setRemoteDirectoryExpression(new LiteralExpression(remoteDir));
//template.afterPropertiesSet();
return template;
}
private void deleteRemoteFile(SftpRemoteFileTemplate template,
String remoteDir, String remoteFilename, boolean remoteRemove) {
LOG.debug("deleteRemoteFile remoteRemove={}", remoteRemove);
if (remoteRemove) {
template.remove(remoteDir + "/" + remoteFilename);
LOG.info("sftpGetToFile {} removed.", remoteDir + "/" + remoteFilename);
}
}
所有这些 GET 操作都是主动操作,这意味着要获取的文件被认为已经存在。我想要一种轮询过程,一旦在 SFTP 服务器上收到文件,它就会调用我的有效负载消耗方法。
我找到了另一个基于Spring beans的实现,配置为Spring Integration Dsl,它声明了一个SftpSessionFactory,一个 SftpInboundFileSynchronizer、一个 SftpMessageSource 和一个 MessageHandler,它轮询 SFTP 站点以接收文件并自动启动消息处理程序进一步处理。
这段代码如下:
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(myHost);
factory.setPort(myPort);
factory.setUser(myUser);
factory.setPassword(myPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(myRemotePath);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter(myFileFilter));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
sftpInboundFileSynchronizer());
source.setLocalDirectory(myLocalDirectory);
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
如何将这个@Poller/MessageHandler/@ServiceActivator 概念包含到我上面的实现中?或者有没有办法在基于模板的实现中实现这个特性?
场景可能如下:
我有一个 Spring 引导应用程序,其中有几个 类 代表任务。其中一些任务是通过 Spring @Scheduled 注释和 CRON 规范自动调用的,其他任务则不是。
@Scheduled(cron = "${task.to.start.automatically.frequency}")
public void runAsTask() {
...
}
第一个任务将从 ist @Sheduled 规范开始,从 SFTP 服务器获取文件并处理它。它将使用自己的通道(host1、port1、user1、password1、remoteDir1、remoteFile1)执行此操作。 第二个任务也将由调度程序 运行 生成一些东西放到 SFTP 服务器上。它将使用自己的通道(host2、port2、user2、password2、remoteDir2、remoteFile2)执行此操作。很可能 host2 = host1 和 port2 = port1,但这不是必须的。 第三个任务也将由调度程序 运行 生成一些东西放到 SFTP 服务器上。它将使用与 task1 相同的通道来执行此操作,但此任务是生产者(而不是像 task1 那样的消费者)并将写入除 task1 之外的另一个文件(host1、port1、user1、password1、remoteDir3、remoteFile3)。 任务四没有 @Scheduled 注释,因为它应该意识到何时从第三方接收到它必须处理的文件,因此可以在其通道(host4、port4、user4、password4、remoteDir4、remoteFile4)上获取其内容以进行处理它。
我已经阅读了整个集成内容,但是很难针对这个用例进行转换,无论是从 XML 配置方案到带注释的 Java 配置,还是通过静态 Spring 在 运行 时间将 bean 方法转换为动态方法。
我了解使用 IntegrationFlow 来注册人工制品,task1 的入站适配器,task2 的出站适配器,task3 的入站适配器与 task1 的会话工厂相同(在其他任何地方注册),以及 - 最后但尤其重要的是 - 具有 task4 轮询器功能的入站适配器。 或者它们都应该是具有命令功能的网关吗?或者我应该注册 SftpRemoteFileTemplate 吗?
要定义频道我有:
public class TransferChannel {
private String host;
private int port;
private String user;
private String password;
/* getters, setters, hash, equals, and toString */
}
要将所有 SFTP 设置放在一起,我有:
public class TransferContext {
private boolean enabled;
private TransferChannel channel;
private String remoteDir;
private String remoteFilename;
private boolean remoteRemove;
private String remoteFilenameFilter;
private String localDir;
/* getters, setters, hash, equals, and toString */
}
作为 SFTP 处理的核心,每个作业都会注入一种 DynamicSftpAdapter:
@Scheduled(cron = "${task.to.start.automatically.frequency}")
public void runAsTask() {
@Autowired
DynamicSftpAdapter sftp;
...
sftp.connect("Task1", context);
File f = sftp.getFile("Task1", "remoteDir", "remoteFile");
/* process file content */
sftp.removeFile("Task1", "remoteDir", "remoteFile");
sftp.disconnect("Task1", context);
}
DynamicSftpAdapter 还只是一个片段:
@Component
public class DynamicSftpAdapter {
private static final Logger LOG = LoggerFactory.getLogger(DynamicTcpServer.class);
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ApplicationContext appContext;
private final Map<TransferChannel, IntegrationFlowRegistration> registrations = new HashMap<>();
private final Map<String, TransferContext> sessions = new ConcurrentHashMap<>();
@Override
public void connect(String sessionId, TransferContext context) {
if (this.registrations.containsKey(context.getChannel())) {
LOG.debug("connect, channel exists for {}", sessionId);
}
else {
// register the required SFTP Outbound Adapter
TransferChannel channel = context.getChannel();
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(cashedSftpSessionFactory(
channel.getHost(), channel.getPort(),
channel.getUser(), channel.getPassword())));
this.registrations.put(channel, flowContext.registration(flow).register());
this.sessions.put(sessionId, context);
LOG.info("sftp session {} for {} started", sessionId, context);
}
}
private DefaultSftpSessionFactory sftpSessionFactory(String host, int port, String user, String password) {
LOG.debug("sftpSessionFactory");
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return factory;
}
private CachingSessionFactory<LsEntry> cashedSftpSessionFactory(String host, int port, String user, String password) {
LOG.debug("cashedSftpSessionFactory");
CachingSessionFactory<LsEntry> cashedSessionFactory =
new CachingSessionFactory<LsEntry>(
sftpSessionFactory(host, port, user, password));
return cashedSessionFactory;
}
@Override
public void sftpGetFile(String sessionId, String remoteDir, String remoteFilename) {
TransferContext context = sessions.get(sessionId);
if (context == null)
throw new IllegalStateException("Session not established, sessionId " + sessionId);
IntegrationFlowRegistration register = registrations.get(context.getChannel());
if (register != null) {
try {
LOG.debug("sftpGetFile get file {}", remoteDir + "/" + remoteFilename);
register.getMessagingTemplate().send(
MessageBuilder.withPayload(msg)
.setHeader(...).build());
}
catch (Exception e) {
appContext.getBean(context, DefaultSftpSessionFactory.class)
.close();
}
}
}
@Override
public void disconnect(String sessionId, TransferContext context) {
IntegrationFlowRegistration registration = this.registrations.remove(context.getChannel());
if (registration != null) {
registration.destroy();
}
LOG.info("sftp session for {} finished", context);
}
}
我不知道如何启动 SFTP 命令。当使用 OutboundGateway 并且必须立即指定 SFTP 命令(如 GET)时,我也没有得到,那么整个 SFTP 处理是否会在一个方法中,指定出站网关工厂并使用 get() 获取实例并可能调用消息 .get() 以任何方式。
显然我需要帮助。
首先,如果您已经在使用 Spring 集成通道适配器,则可能没有理由直接使用像 RemoteFileTemplate
这样的低级 API。
其次存在技术差异:SftpInboundFileSynchronizingMessageSource
将生成本地文件 - 远程文件的完整副本。因此,当我们在下游处理您的 SftpRemoteFileTemplate
逻辑时,它不会很好地工作,因为我们已经带来了一个本地文件 (java.io.File
),而不是远程文件表示的实体。
即使您在 sftpGetPayload()
中的逻辑看起来不像需要这样一个单独的方法那么复杂和自定义,最好将 SftpRemoteFileTemplate
作为单例并共享它当您使用同一个 SFTP 服务器时,不同组件之间。它只是无状态的直接 Spring 模板模式实现。
如果您仍然坚持使用上述集成流程中的方法,您应该考虑为该 @ServiceActivator(inputChannel = "sftpChannel")
调用一个 POJO 方法。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#annotations.
您还可能会发现 SFTP 出站网关是对您的用例有用的组件。它有一些常见的场景实现:https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-outbound-gateway