Spring 尽管设置为无限制,但集成 (SFTP) 消息源每次轮询获取的文件不超过 1 个
Spring Integration (SFTP) message source isn't getting more than 1 file per poll despite setting to unlimited
我有以下代码从 sftp 服务器读取 xml 文件作为 InputStream:
@Configuration
public class SftpConfig {
...
@Bean
@InboundChannelAdapter(channel = "stream", poller = @Poller(fixedDelay="60000"))
public MessageSource<InputStream> messageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory(sftpProperties.getBaseDir());
messageSource.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
// messageSource.setMaxFetchSize(-1); no matter what i set this to, it only fetches one file
return messageSource;
}
@ServiceActivator(inputChannel = "stream", adviceChain = "after")
@Bean
public MessageHandler handle() {
return message -> {
Assert.isTrue(message.getPayload() instanceof InputStream, "Payload must be of type $InputStream");
String filename = (String) message.getHeaders().get(FileHeaders.REMOTE_FILE);
InputStream is = (InputStream) message.getPayload();
log.info("I am here"); // each poll only prints this once
};
}
...
}
当我调试或检查 MessageHanlder$handleMessage
的日志时,我一直只看到一条消息(文件对象)通过。 sftp 服务器上有多个 .xml
文件,我可以通过在下一次轮询中查看文件来验证这一点。文档说
/**
* Set the maximum number of objects the source should fetch if it is necessary to
* fetch objects. Setting the
* maxFetchSize to 0 disables remote fetching, a negative value indicates no limit.
* @param maxFetchSize the max fetch size; a negative value means unlimited.
*/
void setMaxFetchSize(int maxFetchSize);
所以我摆弄了不同的数字但无济于事。我在这里错过了什么?
抱歉造成误导,但 fetch
并不意味着 poll
。获取选项只在第一次轮询时将尽可能多的远程实体带到本地缓存,并且每个后续轮询只从该缓存中获取条目,直到它耗尽。
关于每次轮询的最大消息数的选项属于该 @Poller
配置。查看相应的选项:
/**
* @return The maximum number of messages to receive for each poll.
* Can be specified as 'property placeholder', e.g. {@code ${poller.maxMessagesPerPoll}}.
* Defaults to -1 (infinity) for polling consumers and 1 for polling inbound channel adapters.
*/
String maxMessagesPerPoll() default "";
注意那个1 for polling inbound channel adapters
。这就是您只看到一条消息的方式。
尽管如此,逻辑就像只向频道推送一条消息。您现在有多少文件没有批处理。与 fetch
perPoll
无关,只有一条消息被发送到频道。尽管我同意无限 perPoll
所有消息都在同一个线程和同一个轮询周期内发送。
我有以下代码从 sftp 服务器读取 xml 文件作为 InputStream:
@Configuration
public class SftpConfig {
...
@Bean
@InboundChannelAdapter(channel = "stream", poller = @Poller(fixedDelay="60000"))
public MessageSource<InputStream> messageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory(sftpProperties.getBaseDir());
messageSource.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
// messageSource.setMaxFetchSize(-1); no matter what i set this to, it only fetches one file
return messageSource;
}
@ServiceActivator(inputChannel = "stream", adviceChain = "after")
@Bean
public MessageHandler handle() {
return message -> {
Assert.isTrue(message.getPayload() instanceof InputStream, "Payload must be of type $InputStream");
String filename = (String) message.getHeaders().get(FileHeaders.REMOTE_FILE);
InputStream is = (InputStream) message.getPayload();
log.info("I am here"); // each poll only prints this once
};
}
...
}
当我调试或检查 MessageHanlder$handleMessage
的日志时,我一直只看到一条消息(文件对象)通过。 sftp 服务器上有多个 .xml
文件,我可以通过在下一次轮询中查看文件来验证这一点。文档说
/**
* Set the maximum number of objects the source should fetch if it is necessary to
* fetch objects. Setting the
* maxFetchSize to 0 disables remote fetching, a negative value indicates no limit.
* @param maxFetchSize the max fetch size; a negative value means unlimited.
*/
void setMaxFetchSize(int maxFetchSize);
所以我摆弄了不同的数字但无济于事。我在这里错过了什么?
抱歉造成误导,但 fetch
并不意味着 poll
。获取选项只在第一次轮询时将尽可能多的远程实体带到本地缓存,并且每个后续轮询只从该缓存中获取条目,直到它耗尽。
关于每次轮询的最大消息数的选项属于该 @Poller
配置。查看相应的选项:
/**
* @return The maximum number of messages to receive for each poll.
* Can be specified as 'property placeholder', e.g. {@code ${poller.maxMessagesPerPoll}}.
* Defaults to -1 (infinity) for polling consumers and 1 for polling inbound channel adapters.
*/
String maxMessagesPerPoll() default "";
注意那个1 for polling inbound channel adapters
。这就是您只看到一条消息的方式。
尽管如此,逻辑就像只向频道推送一条消息。您现在有多少文件没有批处理。与 fetch
perPoll
无关,只有一条消息被发送到频道。尽管我同意无限 perPoll
所有消息都在同一个线程和同一个轮询周期内发送。