Spring XD源故障处理

Spring XD source failure handling

我有一个流,其中包含一个与 source:file 模块行为相似的模块,除了使用 redisMetaDataStore 和 Spring Integration 的 FileSystemPersistentAcceptOnceFileListFilter 仅检测一次文件之外,以便模块记住在容器重新启动之间已经读取了哪些文件。

文件-persisting.xml

    <beans:bean id="redisMetadataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore" >
        <beans:constructor-arg ref="redisConnectionFactory" />
    </beans:bean>

    <beans:bean id="persistentAcceptOnceFileFilter" class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
        <beans:constructor-arg ref="redisMetadataStore" />
        <beans:constructor-arg value="${metadataPrefix}" />
    </beans:bean>

    <beans:bean id="antPatternFileFilter" class="org.springframework.integration.file.filters.SimplePatternFileListFilter">
        <beans:constructor-arg value="${pattern}" />
    </beans:bean>

    <beans:bean id="compositeFileFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
        <beans:constructor-arg>
            <beans:list>
                <beans:ref bean="antPatternFileFilter" />
                <beans:ref bean="persistentAcceptOnceFileFilter" />
            </beans:list>
        </beans:constructor-arg>
    </beans:bean>

    <file:inbound-channel-adapter
            auto-startup="false"
            channel="file" directory="${dir}" filter="compositeFileFilter">
        <poller fixed-delay="${fixedDelay}" time-unit="SECONDS"/>
    </file:inbound-channel-adapter>

    <channel id="file"/>

    <chain id="extractContents" input-channel="file" output-channel="output">
        <header-enricher>
            <header name="contentType" value="application/octet-stream"/>
        </header-enricher>
        <file:file-to-bytes-transformer/>
    </chain>

    <channel id="output"/>

我遇到的问题是,如果 redis 容器出现故障,persistentAcceptOnceFileFilter 将在每次调用时抛出异常,将它们放入 errorChannel。

不幸的是,在我的组织内,errorChannel 中发布的所有消息都被一个服务接收,该服务向团队发送电子邮件并在我们的票务系统中发布故障单,这意味着如果 redis 在夜间出现故障,我们的票务系统将受到数万条消息的冲击,一些可怜的团队成员将不得不通过并关闭这些消息。

为了解决这个问题,我想到了使用错误处理来扩展 FileSystemPersistentAcceptOnceFileListFilter,这样我们就不会进入错误通道,而是简单地拒绝检测到的所有文件并在 xd-container 日志中留下消息,但这是一个我不喜欢的解决方案,因为我想通过配置完全解决这个问题,而不完全绕过我们的 ticketing/notification 系统。相反,我更愿意允许单个发送到 errorChannel(所以我们得到一个 notification/ticket),然后停止这个模块是其组件的流(或者至少,功能上等效的东西,最好通过配置)。有没有办法做到这一点?

因为我没有看到自定义 error-channel,我假设您的票证系统是由日志子系统驱动的(通过默认 errorChannel 及其日志处理程序订阅者)。

由于默认 errorChannel 是 pub/sub,您可以订阅第二个流...

<int:transformer input-channel="errorChannel" expression="@fileChannelAdapter.stop()"
    output-channel="control" />

<int:control-bus input-channel="control" />

并在文件通道适配器上设置 id="fileChannelAdapter"