Spring 在入站适配器消息处理程序中使用出站网关的 Sftp 获取文件
Spring Sftp fetch file using outbound gateway within inbound adapter message handler
我正在使用使用 Java DSL 的入站适配器从 SFTP 服务器轮询 pdf 文件。我有一个用例,在获取 pdf 文件后,应用程序将获取 SFTP 服务器上具有相同名称的 CSV 格式的配置文件。获取配置文件后,应用程序将使用配置文件中定义的属性处理原始 pdf 文件,并使用出站适配器将其上传回 SFTP 服务器。
我在使用出站网关在同一线程的处理程序中获取配置文件时遇到问题。
这是我的代码:
注册集成流程:
for (String client : clientsArr) {
this.flowContext.registration(getInboundIntegrationFlow(client)).register();
}
this.flowContext.registration(getOutboundIntegrationFlow()).register();
this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();
入站适配器集成流程:
@Autowired
private SftpPdfMessageHandler messageHandler;
private IntegrationFlow getInboundIntegrationFlow(String client) {
String remoteDirectory = getRemoteDirectory(client);
String localDirectory = getLocalDirectory(client);
String inboundAdapterId = getInboundAdapterId(client);
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(remoteDirectory)
.autoCreateLocalDirectory(true)
.localDirectory(new File(localDirectory))
.maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
.filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
.deleteRemoteFiles(true),
e -> e.id(inboundAdapterId)
.autoStartup(true)
.poller(Pollers
.fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
.receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
.maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
))
.handle(inBoundHandler())
.get();
}
public MessageHandler inBoundHandler() {
return message -> {
File file = (File) message.getPayload();
messageHandler.handleMessage(file);
};
}
出站适配器集成流程:
private IntegrationFlow getOutboundIntegrationFlow() {
return IntegrationFlows.from("sftpOutboundChannel")
.handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
.remoteDirectoryExpression(String.format("headers['%s']", FileHeaders.REMOTE_DIRECTORY))).get();
}
@Bean("sftpOutboundChannel")
public MessageChannel sftpOutboundChannel() {
return new DirectChannel();
}
SFTP 消息处理程序:
@Async("sftpHandlerAsyncExecutor")
public void handleMessage(File originalFile) {
File configFile = fetchConfigFile();
/*
process original file and store processed file in output file path on local directory
*/
boolean success = uploadFileToSftpServer(outputFilePath, client, entity);
if (success) {
deleteFileFromLocal(originalFile);
}
}
出站网关 GET 集成流程:
private IntegrationFlow sftpGatewayGetIntegrationFlow() {
return IntegrationFlows.from("sftpGetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.GET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.DELETE,
AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
.localDirectoryExpression(String.format("headers['%s']", Constants.HEADER_LOCAL_DIRECTORY_NAME))
.autoCreateLocalDirectory(true))
.channel("nullChannel")
.get();
}
@Bean("sftpGetInputChannel")
public MessageChannel sftpGetInputChannel() {
return new DirectChannel();
}
messageHandler.handleMessage()
方法在异步(使用 ThreadPoolTaskExecutor)中调用,它使用出站网关在内部获取配置文件。但是我找不到可以在同一个线程中发送和接收消息负载的单一通道。我在 spring 文档中找到了 MessagingTemplate,但找不到将其与我的出站网关集成流程连接起来的方法。
sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage<>("/dir/file.csv", headers))
使用 DirectChannel() 给出“Dispatcher 没有频道订阅者”异常。
我正在寻找一种解决方案,我可以通过以下任何一种方式从服务器获取所需的文件:
- 使用适当的渠道将 MessagingTemplate 与 IntegrationFlow 集成(如果可能)。
- 入站适配器流中的一些消息处理程序链接,在轮询原始文件后,它将使用 sftp 出站网关获取另一个文件,然后使用两个对象(原始文件和配置文件)调用最终处理程序。我正在尝试使用上面的自定义代码实现类似的事情。
- 在多线程环境中为 GET 命令使用发送和轮询通道的任何其他方式。
Application needs to decide the directory path on runtime while using the GET command.
您可能需要了解什么是 @MessagingGateway
以及如何使其与出站网关上的通道交互。
有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway
如果你真的想得到一个配置文件,你不应该这样做 .channel("nullChannel")
。有了网关,将有 replyChannel
header 和网关填充的 TemporaryReplyChannel
实例。然后在您的代码中,您只需将该功能接口用作 API 即可调用。
事实上,消息传递网关使用提到的 MessagingTemplate.sendAndReceive()
。
我正在使用使用 Java DSL 的入站适配器从 SFTP 服务器轮询 pdf 文件。我有一个用例,在获取 pdf 文件后,应用程序将获取 SFTP 服务器上具有相同名称的 CSV 格式的配置文件。获取配置文件后,应用程序将使用配置文件中定义的属性处理原始 pdf 文件,并使用出站适配器将其上传回 SFTP 服务器。
我在使用出站网关在同一线程的处理程序中获取配置文件时遇到问题。
这是我的代码:
注册集成流程:
for (String client : clientsArr) {
this.flowContext.registration(getInboundIntegrationFlow(client)).register();
}
this.flowContext.registration(getOutboundIntegrationFlow()).register();
this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();
入站适配器集成流程:
@Autowired
private SftpPdfMessageHandler messageHandler;
private IntegrationFlow getInboundIntegrationFlow(String client) {
String remoteDirectory = getRemoteDirectory(client);
String localDirectory = getLocalDirectory(client);
String inboundAdapterId = getInboundAdapterId(client);
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(remoteDirectory)
.autoCreateLocalDirectory(true)
.localDirectory(new File(localDirectory))
.maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
.filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
.deleteRemoteFiles(true),
e -> e.id(inboundAdapterId)
.autoStartup(true)
.poller(Pollers
.fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
.receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
.maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
))
.handle(inBoundHandler())
.get();
}
public MessageHandler inBoundHandler() {
return message -> {
File file = (File) message.getPayload();
messageHandler.handleMessage(file);
};
}
出站适配器集成流程:
private IntegrationFlow getOutboundIntegrationFlow() {
return IntegrationFlows.from("sftpOutboundChannel")
.handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
.remoteDirectoryExpression(String.format("headers['%s']", FileHeaders.REMOTE_DIRECTORY))).get();
}
@Bean("sftpOutboundChannel")
public MessageChannel sftpOutboundChannel() {
return new DirectChannel();
}
SFTP 消息处理程序:
@Async("sftpHandlerAsyncExecutor")
public void handleMessage(File originalFile) {
File configFile = fetchConfigFile();
/*
process original file and store processed file in output file path on local directory
*/
boolean success = uploadFileToSftpServer(outputFilePath, client, entity);
if (success) {
deleteFileFromLocal(originalFile);
}
}
出站网关 GET 集成流程:
private IntegrationFlow sftpGatewayGetIntegrationFlow() {
return IntegrationFlows.from("sftpGetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.GET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.DELETE,
AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
.localDirectoryExpression(String.format("headers['%s']", Constants.HEADER_LOCAL_DIRECTORY_NAME))
.autoCreateLocalDirectory(true))
.channel("nullChannel")
.get();
}
@Bean("sftpGetInputChannel")
public MessageChannel sftpGetInputChannel() {
return new DirectChannel();
}
messageHandler.handleMessage()
方法在异步(使用 ThreadPoolTaskExecutor)中调用,它使用出站网关在内部获取配置文件。但是我找不到可以在同一个线程中发送和接收消息负载的单一通道。我在 spring 文档中找到了 MessagingTemplate,但找不到将其与我的出站网关集成流程连接起来的方法。
sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage<>("/dir/file.csv", headers))
使用 DirectChannel() 给出“Dispatcher 没有频道订阅者”异常。
我正在寻找一种解决方案,我可以通过以下任何一种方式从服务器获取所需的文件:
- 使用适当的渠道将 MessagingTemplate 与 IntegrationFlow 集成(如果可能)。
- 入站适配器流中的一些消息处理程序链接,在轮询原始文件后,它将使用 sftp 出站网关获取另一个文件,然后使用两个对象(原始文件和配置文件)调用最终处理程序。我正在尝试使用上面的自定义代码实现类似的事情。
- 在多线程环境中为 GET 命令使用发送和轮询通道的任何其他方式。
Application needs to decide the directory path on runtime while using the GET command.
您可能需要了解什么是 @MessagingGateway
以及如何使其与出站网关上的通道交互。
有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway
如果你真的想得到一个配置文件,你不应该这样做 .channel("nullChannel")
。有了网关,将有 replyChannel
header 和网关填充的 TemporaryReplyChannel
实例。然后在您的代码中,您只需将该功能接口用作 API 即可调用。
事实上,消息传递网关使用提到的 MessagingTemplate.sendAndReceive()
。