应用过滤器时,流式 MessageSource 会继续触发
StreamingMessageSource keeps firing when a filter is applied
我正在尝试为某种文件轮询 FTP 目录,目录轮询有效,但每当我尝试应用过滤器按扩展名过滤文件时,消息源不断发送垃圾邮件有关文件的消息,而不考虑轮询延迟。没有过滤器,一切正常,一旦我启用它们,我的应用程序就会使用 FTP 进行身份验证,下载文件并一遍又一遍地不停地发送消息。我有以下豆类:
/**
* Factory that creates the remote connection
*
* @return DefaultSftpSessionFactory
*/
@Bean
public DefaultSftpSessionFactory sftpSessionFactory(@Value("${ftp.host}") String ftpHost,
@Value("${ftp.port}") int ftpPort,
@Value("${ftp.user}") String ftpUser,
@Value("${ftp.pass}") String ftpPass) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setAllowUnknownKeys(true);
factory.setHost(ftpHost);
factory.setPort(ftpPort);
factory.setUser(ftpUser);
factory.setPassword(ftpPass);
return factory;
}
/**
* Template to handle remote files
*
* @param sessionFactory SessionFactory bean
* @return SftpRemoteFileTemplate
*/
@Bean
public SftpRemoteFileTemplate fileTemplate(DefaultSftpSessionFactory sessionFactory) {
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sessionFactory);
template.setAutoCreateDirectory(true);
template.setUseTemporaryFileName(false);
return template;
}
/**
* To listen to multiple directories, declare multiples of this bean with the same inbound channel
*
* @param fileTemplate FileTemplate bean
* @return MessageSource
*/
@Bean
@InboundChannelAdapter(channel = "deeplinkAutomated", poller = @Poller(fixedDelay = "6000", maxMessagesPerPoll = "-1"))
public MessageSource inboundChannelAdapter(SftpRemoteFileTemplate fileTemplate) {
SftpStreamingMessageSource source = new SftpStreamingMessageSource(fileTemplate);
source.setRemoteDirectory("/upload");
source.setFilter(new CompositeFileListFilter<>(
Arrays.asList(new AcceptOnceFileListFilter<>(), new SftpSimplePatternFileListFilter("*.trg"))
));
return source;
}
/**
* Listener that activates on new messages on the specified input channel
*
* @return MessageHandler
*/
@Bean
@ServiceActivator(inputChannel = "deeplinkAutomated")
public MessageHandler handler(JobLauncher jobLauncher, Job deeplinkBatch) {
return message -> {
Gson gson = new Gson();
SFTPFileInfo info = gson.fromJson((String) message.getHeaders().get("file_remoteFileInfo"), SFTPFileInfo.class);
System.out.println("File to download: " + info.getFilename().replace(".trg", ".xml"));
};
}
我认为 AcceptOnceFileListFilter
不适合 SFTP 文件。返回的 LsEntry
与之前存储在 HashSet
中的不匹配:只是它们的哈希不同!
考虑改用 SftpPersistentAcceptOnceFileListFilter
。
另外isSharedSession
配置一个DefaultSftpSessionFactory
会更好:
/**
* @param isSharedSession true if the session is to be shared.
*/
public DefaultSftpSessionFactory(boolean isSharedSession) {
避免在每个轮询任务上重新创建会话。
你没有 6 秒的通话延迟,因为你有 maxMessagesPerPoll = "-1"
。这意味着轮询远程文件,直到它们位于远程目录中。在您使用 AcceptOnceFileListFilter
的情况下,由于哈希原因,您总是会得到相同的文件。
我正在尝试为某种文件轮询 FTP 目录,目录轮询有效,但每当我尝试应用过滤器按扩展名过滤文件时,消息源不断发送垃圾邮件有关文件的消息,而不考虑轮询延迟。没有过滤器,一切正常,一旦我启用它们,我的应用程序就会使用 FTP 进行身份验证,下载文件并一遍又一遍地不停地发送消息。我有以下豆类:
/**
* Factory that creates the remote connection
*
* @return DefaultSftpSessionFactory
*/
@Bean
public DefaultSftpSessionFactory sftpSessionFactory(@Value("${ftp.host}") String ftpHost,
@Value("${ftp.port}") int ftpPort,
@Value("${ftp.user}") String ftpUser,
@Value("${ftp.pass}") String ftpPass) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setAllowUnknownKeys(true);
factory.setHost(ftpHost);
factory.setPort(ftpPort);
factory.setUser(ftpUser);
factory.setPassword(ftpPass);
return factory;
}
/**
* Template to handle remote files
*
* @param sessionFactory SessionFactory bean
* @return SftpRemoteFileTemplate
*/
@Bean
public SftpRemoteFileTemplate fileTemplate(DefaultSftpSessionFactory sessionFactory) {
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sessionFactory);
template.setAutoCreateDirectory(true);
template.setUseTemporaryFileName(false);
return template;
}
/**
* To listen to multiple directories, declare multiples of this bean with the same inbound channel
*
* @param fileTemplate FileTemplate bean
* @return MessageSource
*/
@Bean
@InboundChannelAdapter(channel = "deeplinkAutomated", poller = @Poller(fixedDelay = "6000", maxMessagesPerPoll = "-1"))
public MessageSource inboundChannelAdapter(SftpRemoteFileTemplate fileTemplate) {
SftpStreamingMessageSource source = new SftpStreamingMessageSource(fileTemplate);
source.setRemoteDirectory("/upload");
source.setFilter(new CompositeFileListFilter<>(
Arrays.asList(new AcceptOnceFileListFilter<>(), new SftpSimplePatternFileListFilter("*.trg"))
));
return source;
}
/**
* Listener that activates on new messages on the specified input channel
*
* @return MessageHandler
*/
@Bean
@ServiceActivator(inputChannel = "deeplinkAutomated")
public MessageHandler handler(JobLauncher jobLauncher, Job deeplinkBatch) {
return message -> {
Gson gson = new Gson();
SFTPFileInfo info = gson.fromJson((String) message.getHeaders().get("file_remoteFileInfo"), SFTPFileInfo.class);
System.out.println("File to download: " + info.getFilename().replace(".trg", ".xml"));
};
}
我认为 AcceptOnceFileListFilter
不适合 SFTP 文件。返回的 LsEntry
与之前存储在 HashSet
中的不匹配:只是它们的哈希不同!
考虑改用 SftpPersistentAcceptOnceFileListFilter
。
另外isSharedSession
配置一个DefaultSftpSessionFactory
会更好:
/**
* @param isSharedSession true if the session is to be shared.
*/
public DefaultSftpSessionFactory(boolean isSharedSession) {
避免在每个轮询任务上重新创建会话。
你没有 6 秒的通话延迟,因为你有 maxMessagesPerPoll = "-1"
。这意味着轮询远程文件,直到它们位于远程目录中。在您使用 AcceptOnceFileListFilter
的情况下,由于哈希原因,您总是会得到相同的文件。