使用 Java DSL 在远程 S(ftp) 中移动已处理的文件
Moving processed files in remote S(ftp) using Java DSL
一旦批处理使用 Spring 集成和 Java DSL 成功处理了文件,我将尝试移动远程 SFTP 上的文件。
实现该目标的最佳方法是什么?
- 批量添加一个步骤来移动远程文件?
- 或使用FTP出站网关并提供MV命令?
我更喜欢第二种解决方案,让批处理只关注逻辑,但我很难尝试用 java dsl 来实现它。
我已阅读 http://docs.spring.io/spring-integration/reference/html/ftp.html#ftp-outbound-gateway 并尝试像那样实施 :
@Bean
public MessageHandler ftpOutboundGateway() {
return Sftp.outboundGateway(SftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MV, "payload")
.localDirectory(new File("/home/blabla/"))
.get();
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(
Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\.xml.mini$")
...
,
e -> e.id("sftpInboundAdapter")
.poller(
Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
.maxMessagesPerPoll(-1)
.advice(retryAdvice())
)
)
.enrichHeaders( h -> h
.header(FileHeaders.REMOTE_DIRECTORY,"/home/filedrop/")
.header(FileHeaders.REMOTE_FILE, "/home/filedrop/OFFERS.xml.mini")
.header(FileHeaders.RENAME_TO, "/home/filedrop/done/OFFERS.xml.mini")
)
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw()))
.transform(jobExecutionToFileStringTransformer())
.handle(ftpOutboundGateway())
.handle(logger())
.get();
}
我知道我的 headers 应该是动态的,但我不知道该怎么做,所以现在我使用现有文件的名称。我收到此错误消息(他正试图删除目标目录中的文件!):
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343)
at org.springframework.integration.file.remote.RemoteFileTemplate.rename(RemoteFileTemplate.java:290)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doMv(AbstractRemoteFileOutboundGateway.java:482)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:400)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 94 more
Caused by: org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:211)
at org.springframework.integration.file.remote.RemoteFileTemplate.doInSessionWithoutResult(RemoteFileTemplate.java:300)
at org.springframework.integration.file.remote.SessionCallbackWithoutResult.doInSession(SessionCallbackWithoutResult.java:34)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334)
... 100 more
Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
at org.springframework.integration.sftp.session.SftpSession.remove(SftpSession.java:83)
at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:205)
... 103 more
感谢您的帮助!
编辑
工作流程,然后我简化了很多,但这里是我上一个问题的解决方案:
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(
Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\.xml$")
...
,
e -> e.id("sftpInboundAdapter")
.poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
.maxMessagesPerPoll(-1)
)
)
.enrichHeaders( h -> h
// headers necessary for moving remote files (ftpOutboundGateway)
.headerExpression(FileHeaders.RENAME_TO, "'/home/blabla/done/' + payload.getName()")
.headerExpression(FileHeaders.REMOTE_FILE, "payload.getName()")
.header(FileHeaders.REMOTE_DIRECTORY,"/home/blabla/")
// headers necessary for moving local files (fileOutboundGateway_MoveToProcessedDirectory)
.headerExpression(FileHeaders.ORIGINAL_FILE, "payload.getAbsolutePath()" )
.headerExpression(FileHeaders.FILENAME, "payload.getName()")
)
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw(), e-> e.advice(retryAdvice()))
.<JobExecution, Boolean>route(p -> BatchStatus.COMPLETED.equals(p.getStatus()),
mapping -> mapping
.subFlowMapping("true", sf -> sf
.handle(org.springframework.batch.core.JobExecution.class,
(p, h) -> myServiceActivator.jobExecutionToString(p,
(String) h.get(FileHeaders.REMOTE_DIRECTORY),
(String) h.get(FileHeaders.REMOTE_FILE)))
.handle(ftpOutboundGateway())
.handle(Boolean.class,
(p, h) -> myServiceActivator.BooleanToString(p,
(String) h.get(FileHeaders.FILENAME)))
.handle(fileOutboundGateway_MoveToProcessedDirectory())
)
.subFlowMapping("false", sf -> sf
.channel("nullChannel")
)
)
.handle(logger())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(500).get();
}
@Bean
public MessageHandler ftpOutboundGateway() {
return Sftp
.outboundGateway(SftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MV,
"payload")
.renameExpression("headers['file_renameTo']").get();
}
可能您没有重命名权限,或者由于其他原因重命名失败;此异常是尝试删除 "to" 文件名,因为初始重命名失败。打开 DEBUG 日志记录,您应该会看到此日志...
if (logger.isDebugEnabled()){
logger.debug("Initial File rename failed, possibly because file already exists. Will attempt to delete file: "
+ pathTo + " and execute rename again.");
}
try {
this.remove(pathTo);
由于此 remove()
操作失败,您的失败表明由于某些其他原因重命名失败(因为显然 "to" 文件不存在)。
看一下我把ftpInbound里面的文件删了,处理后重新上传到新的目录里
一旦批处理使用 Spring 集成和 Java DSL 成功处理了文件,我将尝试移动远程 SFTP 上的文件。
实现该目标的最佳方法是什么?
- 批量添加一个步骤来移动远程文件?
- 或使用FTP出站网关并提供MV命令?
我更喜欢第二种解决方案,让批处理只关注逻辑,但我很难尝试用 java dsl 来实现它。
我已阅读 http://docs.spring.io/spring-integration/reference/html/ftp.html#ftp-outbound-gateway 并尝试像那样实施 :
@Bean
public MessageHandler ftpOutboundGateway() {
return Sftp.outboundGateway(SftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MV, "payload")
.localDirectory(new File("/home/blabla/"))
.get();
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(
Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\.xml.mini$")
...
,
e -> e.id("sftpInboundAdapter")
.poller(
Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
.maxMessagesPerPoll(-1)
.advice(retryAdvice())
)
)
.enrichHeaders( h -> h
.header(FileHeaders.REMOTE_DIRECTORY,"/home/filedrop/")
.header(FileHeaders.REMOTE_FILE, "/home/filedrop/OFFERS.xml.mini")
.header(FileHeaders.RENAME_TO, "/home/filedrop/done/OFFERS.xml.mini")
)
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw()))
.transform(jobExecutionToFileStringTransformer())
.handle(ftpOutboundGateway())
.handle(logger())
.get();
}
我知道我的 headers 应该是动态的,但我不知道该怎么做,所以现在我使用现有文件的名称。我收到此错误消息(他正试图删除目标目录中的文件!):
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343)
at org.springframework.integration.file.remote.RemoteFileTemplate.rename(RemoteFileTemplate.java:290)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doMv(AbstractRemoteFileOutboundGateway.java:482)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:400)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 94 more
Caused by: org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:211)
at org.springframework.integration.file.remote.RemoteFileTemplate.doInSessionWithoutResult(RemoteFileTemplate.java:300)
at org.springframework.integration.file.remote.SessionCallbackWithoutResult.doInSession(SessionCallbackWithoutResult.java:34)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334)
... 100 more
Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
at org.springframework.integration.sftp.session.SftpSession.remove(SftpSession.java:83)
at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:205)
... 103 more
感谢您的帮助!
编辑 工作流程,然后我简化了很多,但这里是我上一个问题的解决方案:
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(
Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\.xml$")
...
,
e -> e.id("sftpInboundAdapter")
.poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
.maxMessagesPerPoll(-1)
)
)
.enrichHeaders( h -> h
// headers necessary for moving remote files (ftpOutboundGateway)
.headerExpression(FileHeaders.RENAME_TO, "'/home/blabla/done/' + payload.getName()")
.headerExpression(FileHeaders.REMOTE_FILE, "payload.getName()")
.header(FileHeaders.REMOTE_DIRECTORY,"/home/blabla/")
// headers necessary for moving local files (fileOutboundGateway_MoveToProcessedDirectory)
.headerExpression(FileHeaders.ORIGINAL_FILE, "payload.getAbsolutePath()" )
.headerExpression(FileHeaders.FILENAME, "payload.getName()")
)
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw(), e-> e.advice(retryAdvice()))
.<JobExecution, Boolean>route(p -> BatchStatus.COMPLETED.equals(p.getStatus()),
mapping -> mapping
.subFlowMapping("true", sf -> sf
.handle(org.springframework.batch.core.JobExecution.class,
(p, h) -> myServiceActivator.jobExecutionToString(p,
(String) h.get(FileHeaders.REMOTE_DIRECTORY),
(String) h.get(FileHeaders.REMOTE_FILE)))
.handle(ftpOutboundGateway())
.handle(Boolean.class,
(p, h) -> myServiceActivator.BooleanToString(p,
(String) h.get(FileHeaders.FILENAME)))
.handle(fileOutboundGateway_MoveToProcessedDirectory())
)
.subFlowMapping("false", sf -> sf
.channel("nullChannel")
)
)
.handle(logger())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(500).get();
}
@Bean
public MessageHandler ftpOutboundGateway() {
return Sftp
.outboundGateway(SftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MV,
"payload")
.renameExpression("headers['file_renameTo']").get();
}
可能您没有重命名权限,或者由于其他原因重命名失败;此异常是尝试删除 "to" 文件名,因为初始重命名失败。打开 DEBUG 日志记录,您应该会看到此日志...
if (logger.isDebugEnabled()){
logger.debug("Initial File rename failed, possibly because file already exists. Will attempt to delete file: "
+ pathTo + " and execute rename again.");
}
try {
this.remove(pathTo);
由于此 remove()
操作失败,您的失败表明由于某些其他原因重命名失败(因为显然 "to" 文件不存在)。
看一下