Spring 集成流 SFTP:模式过滤器不工作
Spring integration stream SFTP: pattern filter not working
我有以下初始 spring 集成流程:
final var inboundStreamingAdapter = Sftp.inboundStreamingAdapter(new SftpRemoteFileTemplate(sftpSessionFactory))
.patternFilter("*.csv")
.remoteDirectory(sftpRemoteDirectoryDownload);
IntegrationFlows.from(inboundStreamingAdapter,
c -> c.poller(Pollers.trigger(new PeriodicTrigger(10)).maxMessagesPerPoll(1)))
.channel("data")
.split(mySplitter)
.get();
我的 sftp 服务器上有以下文件:
.DS_Store
PL_12002_1815_1.csv1
根据我的理解,适配器应该过滤掉所有不以 csv 后缀结尾的内容。
实际发生的情况是,某些文件 (.DS_Store) 有时会进入 mySplitter,具体取决于当前存储在 FTP.
上的内容
据我所知,问题出在 AbstractRemoteFileStreamingMessageSource(用作 SftpStreamingMessageSource class 的基础 class):
if (this.filter != null && this.filter.supportsSingleFileFiltering()
&& !this.filter.accept(file.getFileInfo())) { //HERE APPLY A FILTER AND FIND OUT PL_12002_1815_1.csv1 SHOULD NOT BE PROCESSED
if (this.toBeReceived.size() > 0) { // don't re-fetch already filtered files
//HERE WE LOAD ANOTHER FILE FROM THE QUEUE (.DS_Store)
file = poll();
}
else {
file = null;
}
}
//FILTER IS NOT REAPPLIED FURTHER DOWN AND WE CREATE A MESSAGE FOR .DS_Store
if (file != null) {
try {
String remotePath = remotePath(file);
Session<?> session = this.remoteFileTemplate.getSession();
try {
return getMessageBuilderFactory()
.withPayload(session.readRaw(remotePath))
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
.setHeader(FileHeaders.REMOTE_FILE_INFO,
this.fileInfoJson ? file.toJson() : file);
}
我会说这是一个错误,但也许我只是配置错误,有人可以帮忙吗?
看起来像一个错误; if
应该是 while
(带有空检查)。
我有以下初始 spring 集成流程:
final var inboundStreamingAdapter = Sftp.inboundStreamingAdapter(new SftpRemoteFileTemplate(sftpSessionFactory))
.patternFilter("*.csv")
.remoteDirectory(sftpRemoteDirectoryDownload);
IntegrationFlows.from(inboundStreamingAdapter,
c -> c.poller(Pollers.trigger(new PeriodicTrigger(10)).maxMessagesPerPoll(1)))
.channel("data")
.split(mySplitter)
.get();
我的 sftp 服务器上有以下文件:
.DS_Store
PL_12002_1815_1.csv1
根据我的理解,适配器应该过滤掉所有不以 csv 后缀结尾的内容。
实际发生的情况是,某些文件 (.DS_Store) 有时会进入 mySplitter,具体取决于当前存储在 FTP.
上的内容据我所知,问题出在 AbstractRemoteFileStreamingMessageSource(用作 SftpStreamingMessageSource class 的基础 class):
if (this.filter != null && this.filter.supportsSingleFileFiltering()
&& !this.filter.accept(file.getFileInfo())) { //HERE APPLY A FILTER AND FIND OUT PL_12002_1815_1.csv1 SHOULD NOT BE PROCESSED
if (this.toBeReceived.size() > 0) { // don't re-fetch already filtered files
//HERE WE LOAD ANOTHER FILE FROM THE QUEUE (.DS_Store)
file = poll();
}
else {
file = null;
}
}
//FILTER IS NOT REAPPLIED FURTHER DOWN AND WE CREATE A MESSAGE FOR .DS_Store
if (file != null) {
try {
String remotePath = remotePath(file);
Session<?> session = this.remoteFileTemplate.getSession();
try {
return getMessageBuilderFactory()
.withPayload(session.readRaw(remotePath))
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
.setHeader(FileHeaders.REMOTE_FILE_INFO,
this.fileInfoJson ? file.toJson() : file);
}
我会说这是一个错误,但也许我只是配置错误,有人可以帮忙吗?
看起来像一个错误; if
应该是 while
(带有空检查)。