spring 集成 - 从 FTP 读取文件并处理它。我们如何实现metastore并在不轮询的情况下进行处理
spring integration - read file from FTP and process it. How can we achieve metastore and process it without polling
在 spring 集成中 - 我正在尝试从 FTP 读取文件并处理它。我们如何在不轮询的情况下实现 Metastore 并对其进行处理。
在下面的配置中,为了避免读取相同的文件,如果发生服务器重启,我在 ftpChannel 中引入了消息存储。
现在,文件的处理器是服务激活器,需要轮询。我怎样才能避免在服务激活器中进行轮询并立即从 ftpChannel 队列中读取文件。如果我使用 int:dispatcher 那么,我将无法使用消息存储。
我们如何解决这个问题?
<int:channel id="ftpChannel">
<int:queue message-store="mongoDbMessageStore" />
<!-- <int:dispatcher task-executor="taskExecutor"/> -->
</int:channel>
<bean id="mongoDbMessageStore"
class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory" />
<constructor-arg name="collectionName" value="ftpInfo" />
</bean>
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel" session-factory="ftpClientFactory" charset="UTF-8"
auto-create-local-directory="true" delete-remote-files="false"
filename-pattern="*.gz" remote-directory="/myfilerepo/#{istDate.getISTDate()}"
remote-file-separator="/" local-filename-generator-expression="#this.toUpperCase()"
temporary-file-suffix=".writing" preserve-timestamp="true"
local-directory="/temp/spring/#{istDate.getISTDate()}">
<int:poller cron="0-5 0/5 * * * ?" max-messages-per-poll="-1"/>
</int-ftp:inbound-channel-adapter>
<int:service-activator id="jobServiceActivator"
input-channel="ftpChannel" ref="triggerJobLauncher" method="launch">
<int:poller fixed-delay="10" />
</int:service-activator>
<!-- job context -->
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<!-- job context -->
您不需要在频道上 message-store
;您需要在 local-filter
中使用 FtpPersistentAcceptOnceFileListFilter in the filter
and/or a FileSystemPersistentAcceptOnceFileListFilter 以避免在系统重启后重新处理文件。
他们需要一个MetadataStore
; 如果你想使用 mongo 你需要实现一个;该框架目前没有 mongo 实现。
编辑:
从 4.2 版开始,框架 now has a mongo MetadataStore
。
根据您的担忧 to avoid reading the same file from the QueueChannel
,我想说您担心 Spring 应用程序在应用程序启动后总是处理相同的消息。但事实并非如此。当消息从那里 polled
时,消息将从队列中删除(当然,从 MessageStore
中删除)。如果您 MessageStore
是事务性资源(例如 JDBC),消息记录将被标记为删除,直到 TX 提交或回滚。
虽然 MongoDB 不是这种情况,但是队列中的任何消息都只会被轮询一次,即使您有这样一个应用程序的集群也是如此。
所以,我不明白为什么你确定 <service-activator>
接受相同的文件(与 payload
)因为你重新启动了应用程序。
从另一方面来说,如果您想访问由您的 ftpChannel
支持的 MessageGroup
,您可以这样做:
mongoDbMessageStore.getMessageGroup("mongoDbMessageStore:ftpChannel");
从另一端,您总是可以 purge
QueueChannel
手动注入 ftpChannel
到某些服务,如 QueueChannelOperations
。
在 spring 集成中 - 我正在尝试从 FTP 读取文件并处理它。我们如何在不轮询的情况下实现 Metastore 并对其进行处理。 在下面的配置中,为了避免读取相同的文件,如果发生服务器重启,我在 ftpChannel 中引入了消息存储。 现在,文件的处理器是服务激活器,需要轮询。我怎样才能避免在服务激活器中进行轮询并立即从 ftpChannel 队列中读取文件。如果我使用 int:dispatcher 那么,我将无法使用消息存储。
我们如何解决这个问题?
<int:channel id="ftpChannel">
<int:queue message-store="mongoDbMessageStore" />
<!-- <int:dispatcher task-executor="taskExecutor"/> -->
</int:channel>
<bean id="mongoDbMessageStore"
class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory" />
<constructor-arg name="collectionName" value="ftpInfo" />
</bean>
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel" session-factory="ftpClientFactory" charset="UTF-8"
auto-create-local-directory="true" delete-remote-files="false"
filename-pattern="*.gz" remote-directory="/myfilerepo/#{istDate.getISTDate()}"
remote-file-separator="/" local-filename-generator-expression="#this.toUpperCase()"
temporary-file-suffix=".writing" preserve-timestamp="true"
local-directory="/temp/spring/#{istDate.getISTDate()}">
<int:poller cron="0-5 0/5 * * * ?" max-messages-per-poll="-1"/>
</int-ftp:inbound-channel-adapter>
<int:service-activator id="jobServiceActivator"
input-channel="ftpChannel" ref="triggerJobLauncher" method="launch">
<int:poller fixed-delay="10" />
</int:service-activator>
<!-- job context -->
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<!-- job context -->
您不需要在频道上 message-store
;您需要在 local-filter
中使用 FtpPersistentAcceptOnceFileListFilter in the filter
and/or a FileSystemPersistentAcceptOnceFileListFilter 以避免在系统重启后重新处理文件。
他们需要一个MetadataStore
; 如果你想使用 mongo 你需要实现一个;该框架目前没有 mongo 实现。
编辑:
从 4.2 版开始,框架 now has a mongo MetadataStore
。
根据您的担忧 to avoid reading the same file from the QueueChannel
,我想说您担心 Spring 应用程序在应用程序启动后总是处理相同的消息。但事实并非如此。当消息从那里 polled
时,消息将从队列中删除(当然,从 MessageStore
中删除)。如果您 MessageStore
是事务性资源(例如 JDBC),消息记录将被标记为删除,直到 TX 提交或回滚。
虽然 MongoDB 不是这种情况,但是队列中的任何消息都只会被轮询一次,即使您有这样一个应用程序的集群也是如此。
所以,我不明白为什么你确定 <service-activator>
接受相同的文件(与 payload
)因为你重新启动了应用程序。
从另一方面来说,如果您想访问由您的 ftpChannel
支持的 MessageGroup
,您可以这样做:
mongoDbMessageStore.getMessageGroup("mongoDbMessageStore:ftpChannel");
从另一端,您总是可以 purge
QueueChannel
手动注入 ftpChannel
到某些服务,如 QueueChannelOperations
。