Spring XD源故障处理
Spring XD source failure handling
我有一个流,其中包含一个与 source:file 模块行为相似的模块,除了使用 redisMetaDataStore 和 Spring Integration 的 FileSystemPersistentAcceptOnceFileListFilter 仅检测一次文件之外,以便模块记住在容器重新启动之间已经读取了哪些文件。
文件-persisting.xml
<beans:bean id="redisMetadataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore" >
<beans:constructor-arg ref="redisConnectionFactory" />
</beans:bean>
<beans:bean id="persistentAcceptOnceFileFilter" class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<beans:constructor-arg ref="redisMetadataStore" />
<beans:constructor-arg value="${metadataPrefix}" />
</beans:bean>
<beans:bean id="antPatternFileFilter" class="org.springframework.integration.file.filters.SimplePatternFileListFilter">
<beans:constructor-arg value="${pattern}" />
</beans:bean>
<beans:bean id="compositeFileFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<beans:constructor-arg>
<beans:list>
<beans:ref bean="antPatternFileFilter" />
<beans:ref bean="persistentAcceptOnceFileFilter" />
</beans:list>
</beans:constructor-arg>
</beans:bean>
<file:inbound-channel-adapter
auto-startup="false"
channel="file" directory="${dir}" filter="compositeFileFilter">
<poller fixed-delay="${fixedDelay}" time-unit="SECONDS"/>
</file:inbound-channel-adapter>
<channel id="file"/>
<chain id="extractContents" input-channel="file" output-channel="output">
<header-enricher>
<header name="contentType" value="application/octet-stream"/>
</header-enricher>
<file:file-to-bytes-transformer/>
</chain>
<channel id="output"/>
我遇到的问题是,如果 redis 容器出现故障,persistentAcceptOnceFileFilter 将在每次调用时抛出异常,将它们放入 errorChannel。
不幸的是,在我的组织内,errorChannel 中发布的所有消息都被一个服务接收,该服务向团队发送电子邮件并在我们的票务系统中发布故障单,这意味着如果 redis 在夜间出现故障,我们的票务系统将受到数万条消息的冲击,一些可怜的团队成员将不得不通过并关闭这些消息。
为了解决这个问题,我想到了使用错误处理来扩展 FileSystemPersistentAcceptOnceFileListFilter,这样我们就不会进入错误通道,而是简单地拒绝检测到的所有文件并在 xd-container 日志中留下消息,但这是一个我不喜欢的解决方案,因为我想通过配置完全解决这个问题,而不完全绕过我们的 ticketing/notification 系统。相反,我更愿意允许单个发送到 errorChannel(所以我们得到一个 notification/ticket),然后停止这个模块是其组件的流(或者至少,功能上等效的东西,最好通过配置)。有没有办法做到这一点?
因为我没有看到自定义 error-channel
,我假设您的票证系统是由日志子系统驱动的(通过默认 errorChannel
及其日志处理程序订阅者)。
由于默认 errorChannel
是 pub/sub,您可以订阅第二个流...
<int:transformer input-channel="errorChannel" expression="@fileChannelAdapter.stop()"
output-channel="control" />
<int:control-bus input-channel="control" />
并在文件通道适配器上设置 id="fileChannelAdapter"
。
我有一个流,其中包含一个与 source:file 模块行为相似的模块,除了使用 redisMetaDataStore 和 Spring Integration 的 FileSystemPersistentAcceptOnceFileListFilter 仅检测一次文件之外,以便模块记住在容器重新启动之间已经读取了哪些文件。
文件-persisting.xml
<beans:bean id="redisMetadataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore" >
<beans:constructor-arg ref="redisConnectionFactory" />
</beans:bean>
<beans:bean id="persistentAcceptOnceFileFilter" class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<beans:constructor-arg ref="redisMetadataStore" />
<beans:constructor-arg value="${metadataPrefix}" />
</beans:bean>
<beans:bean id="antPatternFileFilter" class="org.springframework.integration.file.filters.SimplePatternFileListFilter">
<beans:constructor-arg value="${pattern}" />
</beans:bean>
<beans:bean id="compositeFileFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<beans:constructor-arg>
<beans:list>
<beans:ref bean="antPatternFileFilter" />
<beans:ref bean="persistentAcceptOnceFileFilter" />
</beans:list>
</beans:constructor-arg>
</beans:bean>
<file:inbound-channel-adapter
auto-startup="false"
channel="file" directory="${dir}" filter="compositeFileFilter">
<poller fixed-delay="${fixedDelay}" time-unit="SECONDS"/>
</file:inbound-channel-adapter>
<channel id="file"/>
<chain id="extractContents" input-channel="file" output-channel="output">
<header-enricher>
<header name="contentType" value="application/octet-stream"/>
</header-enricher>
<file:file-to-bytes-transformer/>
</chain>
<channel id="output"/>
我遇到的问题是,如果 redis 容器出现故障,persistentAcceptOnceFileFilter 将在每次调用时抛出异常,将它们放入 errorChannel。
不幸的是,在我的组织内,errorChannel 中发布的所有消息都被一个服务接收,该服务向团队发送电子邮件并在我们的票务系统中发布故障单,这意味着如果 redis 在夜间出现故障,我们的票务系统将受到数万条消息的冲击,一些可怜的团队成员将不得不通过并关闭这些消息。
为了解决这个问题,我想到了使用错误处理来扩展 FileSystemPersistentAcceptOnceFileListFilter,这样我们就不会进入错误通道,而是简单地拒绝检测到的所有文件并在 xd-container 日志中留下消息,但这是一个我不喜欢的解决方案,因为我想通过配置完全解决这个问题,而不完全绕过我们的 ticketing/notification 系统。相反,我更愿意允许单个发送到 errorChannel(所以我们得到一个 notification/ticket),然后停止这个模块是其组件的流(或者至少,功能上等效的东西,最好通过配置)。有没有办法做到这一点?
因为我没有看到自定义 error-channel
,我假设您的票证系统是由日志子系统驱动的(通过默认 errorChannel
及其日志处理程序订阅者)。
由于默认 errorChannel
是 pub/sub,您可以订阅第二个流...
<int:transformer input-channel="errorChannel" expression="@fileChannelAdapter.stop()"
output-channel="control" />
<int:control-bus input-channel="control" />
并在文件通道适配器上设置 id="fileChannelAdapter"
。