Spring 尽管设置为无限制,但集成 (SFTP) 消息源每次轮询获取的文件不超过 1 个

Spring Integration (SFTP) message source isn't getting more than 1 file per poll despite setting to unlimited

我有以下代码从 sftp 服务器读取 xml 文件作为 InputStream:

@Configuration
public class SftpConfig {
    ...

    @Bean
    @InboundChannelAdapter(channel = "stream", poller = @Poller(fixedDelay="60000"))
    public MessageSource<InputStream> messageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory(sftpProperties.getBaseDir());
        messageSource.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        // messageSource.setMaxFetchSize(-1); no matter what i set this to, it only fetches one file
        return messageSource;
    }

    @ServiceActivator(inputChannel = "stream", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return message -> {
            Assert.isTrue(message.getPayload() instanceof InputStream, "Payload must be of type $InputStream");
            String filename = (String) message.getHeaders().get(FileHeaders.REMOTE_FILE);
            InputStream is = (InputStream) message.getPayload();
            log.info("I am here"); // each poll only prints this once
        };
    }
    ...
}

当我调试或检查 MessageHanlder$handleMessage 的日志时,我一直只看到一条消息(文件对象)通过。 sftp 服务器上有多个 .xml 文件,我可以通过在下一次轮询中查看文件来验证这一点。文档说

    /**
     * Set the maximum number of objects the source should fetch if it is necessary to
     * fetch objects. Setting the
     * maxFetchSize to 0 disables remote fetching, a negative value indicates no limit.
     * @param maxFetchSize the max fetch size; a negative value means unlimited.
     */
    void setMaxFetchSize(int maxFetchSize);

所以我摆弄了不同的数字但无济于事。我在这里错过了什么?

抱歉造成误导,但 fetch 并不意味着 poll。获取选项只在第一次轮询时将尽可能多的远程实体带到本地缓存,并且每个后续轮询只从该缓存中获取条目,直到它耗尽。

关于每次轮询的最大消息数的选项属于该 @Poller 配置。查看相应的选项:

/**
 * @return The maximum number of messages to receive for each poll.
 * Can be specified as 'property placeholder', e.g. {@code ${poller.maxMessagesPerPoll}}.
 * Defaults to -1 (infinity) for polling consumers and 1 for polling inbound channel adapters.
 */
String maxMessagesPerPoll() default "";

注意那个1 for polling inbound channel adapters。这就是您只看到一条消息的方式。

尽管如此,逻辑就像只向频道推送一条消息。您现在有多少文件没有批处理。与 fetch perPoll 无关,只有一条消息被发送到频道。尽管我同意无限 perPoll 所有消息都在同一个线程和同一个轮询周期内发送。