spring 集成 - 从 FTP 读取文件并处理它。我们如何实现metastore并在不轮询的情况下进行处理

spring integration - read file from FTP and process it. How can we achieve metastore and process it without polling

在 spring 集成中 - 我正在尝试从 FTP 读取文件并处理它。我们如何在不轮询的情况下实现 Metastore 并对其进行处理。 在下面的配置中,为了避免读取相同的文件,如果发生服务器重启,我在 ftpChannel 中引入了消息存储。 现在,文件的处理器是服务激活器,需要轮询。我怎样才能避免在服务激活器中进行轮询并立即从 ftpChannel 队列中读取文件。如果我使用 int:dispatcher 那么,我将无法使用消息存储。

我们如何解决这个问题?

<int:channel id="ftpChannel">
    <int:queue message-store="mongoDbMessageStore" />
    <!-- <int:dispatcher task-executor="taskExecutor"/> -->
</int:channel>

<bean id="mongoDbMessageStore"
    class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory" />
    <constructor-arg name="collectionName" value="ftpInfo" />
</bean>

<int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel" session-factory="ftpClientFactory" charset="UTF-8"
    auto-create-local-directory="true" delete-remote-files="false"
    filename-pattern="*.gz" remote-directory="/myfilerepo/#{istDate.getISTDate()}"
    remote-file-separator="/" local-filename-generator-expression="#this.toUpperCase()"
    temporary-file-suffix=".writing" preserve-timestamp="true"
    local-directory="/temp/spring/#{istDate.getISTDate()}">

     <int:poller cron="0-5 0/5 * * * ?" max-messages-per-poll="-1"/>
</int-ftp:inbound-channel-adapter>

<int:service-activator id="jobServiceActivator"
    input-channel="ftpChannel" ref="triggerJobLauncher" method="launch">
    <int:poller fixed-delay="10" />
</int:service-activator>

<!-- job context -->
<bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
    <property name="transactionManager" ref="transactionManager" />
</bean>

<bean id="transactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<bean id="jobLauncher"
    class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>
<!-- job context -->

您不需要在频道上 message-store;您需要在 local-filter 中使用 FtpPersistentAcceptOnceFileListFilter in the filter and/or a FileSystemPersistentAcceptOnceFileListFilter 以避免在系统重启后重新处理文件。

他们需要一个MetadataStore如果你想使用 mongo 你需要实现一个;该框架目前没有 mongo 实现。

编辑:

从 4.2 版开始,框架 now has a mongo MetadataStore

根据您的担忧 to avoid reading the same file from the QueueChannel,我想说您担心 Spring 应用程序在应用程序启动后总是处理相同的消息。但事实并非如此。当消息从那里 polled 时,消息将从队列中删除(当然,从 MessageStore 中删除)。如果您 MessageStore 是事务性资源(例如 JDBC),消息记录将被标记为删除,直到 TX 提交或回滚。 虽然 MongoDB 不是这种情况,但是队列中的任何消息都只会被轮询一次,即使您有这样一个应用程序的集群也是如此。

所以,我不明白为什么你确定 <service-activator> 接受相同的文件(与 payload)因为你重新启动了应用程序。

从另一方面来说,如果您想访问由您的 ftpChannel 支持的 MessageGroup,您可以这样做:

mongoDbMessageStore.getMessageGroup("mongoDbMessageStore:ftpChannel");

从另一端,您总是可以 purge QueueChannel 手动注入 ftpChannel 到某些服务,如 QueueChannelOperations