配置不丢失消息
Configuration not to loose messages
我有 spring xd 组件称为 throttle,我在其中存储消息,post 是并限制它们并发送它们 slowly.I am 运行 我的 xd in 3 containers.I 对延迟器有延迟,因为 0.When 一个容器去 down.I 松动 messages.Below 是我的 configuration.Looks 就像它不存储到 postgre 和松动内存或缓存。
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-groovy="http://www.springframework.org/schema/integration/groovy"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:ehcache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/integration/jdbc
http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
http://www.springframework.org/schema/integration/groovy
http://www.springframework.org/schema/integration/groovy/spring-integration-groovy.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.1.xsd">
<ehcache:annotation-driven cache-manager="cacheManager" />
<context:property-placeholder
location="classpath*:dms-security-${region}.properties, classpath*:dms-jms-${region}.properties, classpath*:dms-mongo-${region}.properties" />
<context:component-scan base-package="com.testhrottle.api.dms.*"
use-default-filters="false">
<context:include-filter type="annotation"
expression="org.springframework.context.annotation.Configuration" />
</context:component-scan>
<channel id="input">
<!-- <interceptors>
<ref bean="messageInterceptor" />
</interceptors> -->
</channel>
<channel id="output" />
<channel id="sms" />
<channel id="eml" />
<channel id="del" />
<!-- <header-value-router input-channel="input" header-name="Channel-Type" >
<mapping value="PHN" channel="sms" />
<mapping value="EML" channel="eml" />
<mapping value="DEL" channel="del" />
</header-value-router> -->
<router input-channel="input">
<beans:bean name="messageRouter" class="com.testhrottle.xd.util.MessageRouter">
<beans:property name="cache" ref="configCache" />
</beans:bean>
</router>
<service-activator input-channel="del" ref="delete" method="delete"/>
<filter input-channel="eml" output-channel="output">
<beans:bean class="com.testhrottle.xd.throttle.Throttle">
<beans:constructor-arg value="${tpsThreshold}" />
<beans:constructor-arg value="${spanSeconds}" />
<beans:property name="cache" ref="configCache" />
<beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
<beans:property name="errorPublishing" ref="errorPublishing" />
</beans:bean>
</filter>
<filter input-channel="sms" output-channel="output">
<beans:bean class="com.testhrottle.xd.throttle.SmsThrottle">
<beans:constructor-arg value="${smsTpsThreshold}" />
<beans:constructor-arg value="${smsSpanSeconds}" />
<beans:property name="cache" ref="configCache" />
<beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
<beans:property name="errorPublishing" ref="errorPublishing" />
</beans:bean>
</filter>
<channel id="requeue">
<dispatcher task-executor="taskExecutor" />
</channel>
<task:executor id="taskExecutor" pool-size="1" />
<delayer id="delayer" input-channel="requeue" default-delay="${holdingDelay}"
expression="headers['delay']" message-store="messageStore"
output-channel="input" />
<!-- <int-jdbc:message-store id="messageStore" data-source="dataSource" /> -->
<beans:bean id="messageStore" class="com.testhrottle.xd.util.CustomJdbcMessageStore">
<beans:constructor-arg ref="dataSource"/>
</beans:bean>
<!-- <beans:bean name="messageInterceptor" class="com.testhrottle.xd.util.MessageInterceptor">
<beans:property name="cache" ref="configCache" />
</beans:bean> -->
<beans:bean name="configCache"
class="com.testhrottle.xd.cache.ConfigCache">
<beans:property name="jdbcTemplate" ref="jdbcTemplate" />
<beans:property name="holdRetryIntervelInSeconds" value="${holdRetryIntervelInSeconds}" />
<beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay,purge_ind purge from ${schema}.alrt_type where alrt_type_cd = ?" />
</beans:bean>
<beans:bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<beans:property name="driverClassName" value="org.postgresql.Driver"/>
<beans:property name="url" value="${dbURL}"/>
<beans:property name="username" value="${userName}"/>
<beans:property name="password" value="#{encryptedDatum.decryptBase64Encoded('${passWord}')}"/>
</beans:bean>
<beans:bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<beans:property name="dataSource">
<beans:ref bean="dataSource" />
</beans:property>
</beans:bean>
<beans:bean id="encryptedDatum" class="com.testhrottle.api.dms.core.security.EncryptedSecuredDatum"/>
<!-- <beans:bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"/> -->
<beans:bean id="errorPublishing" class="com.testhrottle.xd.util.ErrorPublishing">
<!-- <beans:property name="rabbitTemplate" ref="rabbitTemplate" /> -->
</beans:bean>
<beans:bean id="delete" class="com.testhrottle.xd.util.Delete"/>
<!--
<beans:bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<beans:property name="driverClassName" value="org.postgresql.Driver"/>
<beans:property name="url" value="jdbc:postgresql://localhost:5432/postgres"/>
<beans:property name="username" value="postgres"/>
<beans:property name="password" value="postgres"/>
</beans:bean>
<beans:bean name="configCache"
class="com.testhrottle.xd.cache.ConfigCache">
<beans:property name="jdbcTemplate" ref="jdbcTemplate" />
<beans:property name="holdRetryIntervelInSeconds" value="10" />
<beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay from dms_wb.alrt_type where alrt_type_cd = ?;
" />
</beans:bean>
<beans:bean name="cacheTest"
class="com.testhrottle.xd.cache.CacheTest">
<beans:property name="cache" ref="configCache" />
</beans:bean> -->
<beans:bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
<beans:bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager">
<beans:property name="cacheManager" ref="ehcache" />
</beans:bean>
<beans:bean id="ehcache"
class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean">
<beans:property name="configLocation" value="classpath:ehcache.xml" />
<beans:property name="shared" value="true" />
</beans:bean>
<beans:bean name="dailyLimitCounter" class="com.testhrottle.xd.throttle.DailyLimitCounter"/>
<task:scheduler id="taskSchedulerInbound" pool-size="10" />
<task:scheduled-tasks>
<task:scheduled ref="dailyLimitCounter" method="clearAlertDailyCount" cron="59 59 23 * * *" />
</task:scheduled-tasks>
</beans:beans>
所以如果我删除它它应该在同一个线程中工作。
<channel id="requeue">
<dispatcher task-executor="taskExecutor" />
</channel>
<task:executor id="taskExecutor" pool-size="1" />
看起来你的 <dispatcher task-executor="taskExecutor" />
有罪。
由于您跳出了消息总线线程,ack
s(提交)了 Broker 上的消息,但是节点的崩溃并没有影响(回滚),因为您的消息已移至不同线程。而且它还没有到达您的数据库以在那里提交。
一般来说,最好在 Spring XD 中使用容器 (XD) 线程执行所有操作并依赖消息总线持久机制。
或者手动将消息提交到数据库,但在同一个 XD 线程中。
有道理吗?
我有 spring xd 组件称为 throttle,我在其中存储消息,post 是并限制它们并发送它们 slowly.I am 运行 我的 xd in 3 containers.I 对延迟器有延迟,因为 0.When 一个容器去 down.I 松动 messages.Below 是我的 configuration.Looks 就像它不存储到 postgre 和松动内存或缓存。
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-groovy="http://www.springframework.org/schema/integration/groovy"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:ehcache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/integration/jdbc
http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
http://www.springframework.org/schema/integration/groovy
http://www.springframework.org/schema/integration/groovy/spring-integration-groovy.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.1.xsd">
<ehcache:annotation-driven cache-manager="cacheManager" />
<context:property-placeholder
location="classpath*:dms-security-${region}.properties, classpath*:dms-jms-${region}.properties, classpath*:dms-mongo-${region}.properties" />
<context:component-scan base-package="com.testhrottle.api.dms.*"
use-default-filters="false">
<context:include-filter type="annotation"
expression="org.springframework.context.annotation.Configuration" />
</context:component-scan>
<channel id="input">
<!-- <interceptors>
<ref bean="messageInterceptor" />
</interceptors> -->
</channel>
<channel id="output" />
<channel id="sms" />
<channel id="eml" />
<channel id="del" />
<!-- <header-value-router input-channel="input" header-name="Channel-Type" >
<mapping value="PHN" channel="sms" />
<mapping value="EML" channel="eml" />
<mapping value="DEL" channel="del" />
</header-value-router> -->
<router input-channel="input">
<beans:bean name="messageRouter" class="com.testhrottle.xd.util.MessageRouter">
<beans:property name="cache" ref="configCache" />
</beans:bean>
</router>
<service-activator input-channel="del" ref="delete" method="delete"/>
<filter input-channel="eml" output-channel="output">
<beans:bean class="com.testhrottle.xd.throttle.Throttle">
<beans:constructor-arg value="${tpsThreshold}" />
<beans:constructor-arg value="${spanSeconds}" />
<beans:property name="cache" ref="configCache" />
<beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
<beans:property name="errorPublishing" ref="errorPublishing" />
</beans:bean>
</filter>
<filter input-channel="sms" output-channel="output">
<beans:bean class="com.testhrottle.xd.throttle.SmsThrottle">
<beans:constructor-arg value="${smsTpsThreshold}" />
<beans:constructor-arg value="${smsSpanSeconds}" />
<beans:property name="cache" ref="configCache" />
<beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
<beans:property name="errorPublishing" ref="errorPublishing" />
</beans:bean>
</filter>
<channel id="requeue">
<dispatcher task-executor="taskExecutor" />
</channel>
<task:executor id="taskExecutor" pool-size="1" />
<delayer id="delayer" input-channel="requeue" default-delay="${holdingDelay}"
expression="headers['delay']" message-store="messageStore"
output-channel="input" />
<!-- <int-jdbc:message-store id="messageStore" data-source="dataSource" /> -->
<beans:bean id="messageStore" class="com.testhrottle.xd.util.CustomJdbcMessageStore">
<beans:constructor-arg ref="dataSource"/>
</beans:bean>
<!-- <beans:bean name="messageInterceptor" class="com.testhrottle.xd.util.MessageInterceptor">
<beans:property name="cache" ref="configCache" />
</beans:bean> -->
<beans:bean name="configCache"
class="com.testhrottle.xd.cache.ConfigCache">
<beans:property name="jdbcTemplate" ref="jdbcTemplate" />
<beans:property name="holdRetryIntervelInSeconds" value="${holdRetryIntervelInSeconds}" />
<beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay,purge_ind purge from ${schema}.alrt_type where alrt_type_cd = ?" />
</beans:bean>
<beans:bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<beans:property name="driverClassName" value="org.postgresql.Driver"/>
<beans:property name="url" value="${dbURL}"/>
<beans:property name="username" value="${userName}"/>
<beans:property name="password" value="#{encryptedDatum.decryptBase64Encoded('${passWord}')}"/>
</beans:bean>
<beans:bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<beans:property name="dataSource">
<beans:ref bean="dataSource" />
</beans:property>
</beans:bean>
<beans:bean id="encryptedDatum" class="com.testhrottle.api.dms.core.security.EncryptedSecuredDatum"/>
<!-- <beans:bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"/> -->
<beans:bean id="errorPublishing" class="com.testhrottle.xd.util.ErrorPublishing">
<!-- <beans:property name="rabbitTemplate" ref="rabbitTemplate" /> -->
</beans:bean>
<beans:bean id="delete" class="com.testhrottle.xd.util.Delete"/>
<!--
<beans:bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<beans:property name="driverClassName" value="org.postgresql.Driver"/>
<beans:property name="url" value="jdbc:postgresql://localhost:5432/postgres"/>
<beans:property name="username" value="postgres"/>
<beans:property name="password" value="postgres"/>
</beans:bean>
<beans:bean name="configCache"
class="com.testhrottle.xd.cache.ConfigCache">
<beans:property name="jdbcTemplate" ref="jdbcTemplate" />
<beans:property name="holdRetryIntervelInSeconds" value="10" />
<beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay from dms_wb.alrt_type where alrt_type_cd = ?;
" />
</beans:bean>
<beans:bean name="cacheTest"
class="com.testhrottle.xd.cache.CacheTest">
<beans:property name="cache" ref="configCache" />
</beans:bean> -->
<beans:bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
<beans:bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager">
<beans:property name="cacheManager" ref="ehcache" />
</beans:bean>
<beans:bean id="ehcache"
class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean">
<beans:property name="configLocation" value="classpath:ehcache.xml" />
<beans:property name="shared" value="true" />
</beans:bean>
<beans:bean name="dailyLimitCounter" class="com.testhrottle.xd.throttle.DailyLimitCounter"/>
<task:scheduler id="taskSchedulerInbound" pool-size="10" />
<task:scheduled-tasks>
<task:scheduled ref="dailyLimitCounter" method="clearAlertDailyCount" cron="59 59 23 * * *" />
</task:scheduled-tasks>
</beans:beans>
所以如果我删除它它应该在同一个线程中工作。
<channel id="requeue">
<dispatcher task-executor="taskExecutor" />
</channel>
<task:executor id="taskExecutor" pool-size="1" />
看起来你的 <dispatcher task-executor="taskExecutor" />
有罪。
由于您跳出了消息总线线程,ack
s(提交)了 Broker 上的消息,但是节点的崩溃并没有影响(回滚),因为您的消息已移至不同线程。而且它还没有到达您的数据库以在那里提交。
一般来说,最好在 Spring XD 中使用容器 (XD) 线程执行所有操作并依赖消息总线持久机制。 或者手动将消息提交到数据库,但在同一个 XD 线程中。
有道理吗?