Spring 与 RedisLockRegistry 集成示例

Spring Integration with RedisLockRegistry example

我们正在实施一个流程,其中 <int-sftp:inbound-streaming-channel-adapter/> 轮询目录中的文件,并在找到时将流传递给服务激活器。

问题是我们将有多个应用程序实例 运行,我们想锁定进程,以便只有一个实例可以获取文件。

查看文档,Redis Lock Registry 看起来是解决方案,是否有在 xml 中使用的示例?

我所能找到的只是对它的一些引用和它的源代码。

http://docs.spring.io/spring-integration/reference/html/redis.html点24.1

添加信息: 我添加了 RedisMetaDataStore 和 SftpSimplePatternFileListFilter。它确实有效,但它确实有一个奇怪之处,当轮询器激活 sftpInboundAdapter 时,它会为元数据存储中的每个文件添加一个条目。假设有 10 个文件,数据存储中将有 10 个条目,但它不会“一次处理”所有 10 个文件,适配器每次轮询仅处理 1 个文件,这很好,但在多实例中环境 如果拾取文件的服务器在处理 5 个文件后出现故障,另一台服务器似乎不会拾取剩余的 5 个文件,除非文件是 "touched"。

每次轮询选取 1 个文件的行为是正确的还是应该在一次轮询期间处理所有有效文件。

下面是我的XML

   <int:channel id="sftpInbound"/> <!--  To Java -->
<int:channel id="sftpOutbound"/>
<int:channel id="sftpStreamTransformer"/>

<int-sftp:inbound-streaming-channel-adapter id="sftpInboundAdapter"
        channel="sftpInbound"
        session-factory="sftpSessionFactory"
        filter="compositeFilter"
        remote-file-separator="/"
        remote-directory="${sftp.directory}">
    <int:poller cron="${sftp.cron}"/>
</int-sftp:inbound-streaming-channel-adapter>

<int:stream-transformer input-channel="sftpStreamTransformer" output-channel="sftpOutbound"/>

<bean id="compositeFilter"
    class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean
                class="org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter">
                <constructor-arg value="Receipt*.txt" />
            </bean>
            <bean id="SftpPersistentAcceptOnceFileListFilter" class="org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter">
                <constructor-arg ref="metadataStore" />                 
                <constructor-arg value="ReceiptLock_" />                    
            </bean>
        </list>
    </constructor-arg>
</bean>

<bean id="redisConnectionFactory"
    class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
    <property name="port" value="${redis.port}" />
    <property name="password" value="${redis.password}" />
    <property name="hostName" value="${redis.host}" />
</bean>

否;您需要将 SftpPersistentAcceptOnceFileListFilter (docs here) 与 Redis(或其他一些)元数据存储一起使用,而不是锁注册表。

编辑

关于您在下方的评论。

是的,这是一个已知问题;在 next release 中,我们添加了一个 max-fetch-size 正是出于这个原因 - 因此每个实例都可以检索一些文件,而不是第一个实例获取所有文件。

(入站适配器的工作方式是首先将找到的文件(不在存储中)复制到本地磁盘,然后一次一个地发出它们)。

5.0 目前仅作为里程碑提供 M2 at the time of writing, but the current version and milestone repo can be found here;几个月后才会发布。

另一种选择是使用出站网关——一个用于 LS 文件,一个用于获取单个文件;不过,您的应用必须使用元数据存储本身来确定可以获取哪些文件。