配置不丢失消息

Configuration not to loose messages

我有 spring xd 组件称为 throttle,我在其中存储消息,post 是并限制它们并发送它们 slowly.I am 运行 我的 xd in 3 containers.I 对延迟器有延迟,因为 0.When 一个容器去 down.I 松动 messages.Below 是我的 configuration.Looks 就像它不存储到 postgre 和松动内存或缓存。

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:beans="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int-groovy="http://www.springframework.org/schema/integration/groovy"
    xmlns:task="http://www.springframework.org/schema/task" 
    xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:ehcache="http://www.springframework.org/schema/cache"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
        http://www.springframework.org/schema/integration/jdbc 
        http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
        http://www.springframework.org/schema/integration/groovy
        http://www.springframework.org/schema/integration/groovy/spring-integration-groovy.xsd
        http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.1.xsd">



    <ehcache:annotation-driven cache-manager="cacheManager" />

    <context:property-placeholder
        location="classpath*:dms-security-${region}.properties, classpath*:dms-jms-${region}.properties, classpath*:dms-mongo-${region}.properties" />

    <context:component-scan base-package="com.testhrottle.api.dms.*"
        use-default-filters="false">
        <context:include-filter type="annotation"
            expression="org.springframework.context.annotation.Configuration" />
    </context:component-scan>

    <channel id="input">
        <!-- <interceptors>
            <ref bean="messageInterceptor" />
        </interceptors> -->
    </channel>

    <channel id="output" />

    <channel id="sms" />

    <channel id="eml" />

    <channel id="del" />

    <!-- <header-value-router input-channel="input" header-name="Channel-Type" >
        <mapping value="PHN" channel="sms" />
        <mapping value="EML" channel="eml" />
        <mapping value="DEL" channel="del" />
    </header-value-router> -->

    <router input-channel="input">
        <beans:bean name="messageRouter" class="com.testhrottle.xd.util.MessageRouter">
            <beans:property name="cache" ref="configCache" />
        </beans:bean>
    </router>

    <service-activator input-channel="del" ref="delete" method="delete"/>

    <filter input-channel="eml" output-channel="output">
        <beans:bean class="com.testhrottle.xd.throttle.Throttle">
            <beans:constructor-arg value="${tpsThreshold}" />
            <beans:constructor-arg value="${spanSeconds}" />
            <beans:property name="cache" ref="configCache" />
            <beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
            <beans:property name="errorPublishing" ref="errorPublishing" />
        </beans:bean>
    </filter>

    <filter input-channel="sms" output-channel="output">
        <beans:bean class="com.testhrottle.xd.throttle.SmsThrottle">
            <beans:constructor-arg value="${smsTpsThreshold}" />
            <beans:constructor-arg value="${smsSpanSeconds}" />
            <beans:property name="cache" ref="configCache" />
            <beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
            <beans:property name="errorPublishing" ref="errorPublishing" />
        </beans:bean>
    </filter>

    <channel id="requeue">
        <dispatcher task-executor="taskExecutor" />
    </channel>

    <task:executor id="taskExecutor" pool-size="1" />

    <delayer id="delayer" input-channel="requeue" default-delay="${holdingDelay}"
        expression="headers['delay']" message-store="messageStore"
        output-channel="input" />


    <!-- <int-jdbc:message-store id="messageStore" data-source="dataSource" /> --> 

    <beans:bean id="messageStore" class="com.testhrottle.xd.util.CustomJdbcMessageStore">
      <beans:constructor-arg ref="dataSource"/>
   </beans:bean>

    <!-- <beans:bean name="messageInterceptor" class="com.testhrottle.xd.util.MessageInterceptor">
        <beans:property name="cache" ref="configCache" />
    </beans:bean> -->

    <beans:bean name="configCache"
        class="com.testhrottle.xd.cache.ConfigCache">
        <beans:property name="jdbcTemplate" ref="jdbcTemplate" />
        <beans:property name="holdRetryIntervelInSeconds" value="${holdRetryIntervelInSeconds}" />
        <beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay,purge_ind purge from ${schema}.alrt_type where alrt_type_cd = ?" />
    </beans:bean>

    <beans:bean id="dataSource" 
    class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <beans:property name="driverClassName" value="org.postgresql.Driver"/>
        <beans:property name="url" value="${dbURL}"/>
        <beans:property name="username" value="${userName}"/>
        <beans:property name="password" value="#{encryptedDatum.decryptBase64Encoded('${passWord}')}"/>
    </beans:bean>

    <beans:bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <beans:property name="dataSource">
            <beans:ref bean="dataSource" />
        </beans:property>
    </beans:bean>

    <beans:bean id="encryptedDatum" class="com.testhrottle.api.dms.core.security.EncryptedSecuredDatum"/>

    <!-- <beans:bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"/> -->

    <beans:bean id="errorPublishing" class="com.testhrottle.xd.util.ErrorPublishing">
        <!-- <beans:property name="rabbitTemplate" ref="rabbitTemplate" /> -->
    </beans:bean>

    <beans:bean id="delete" class="com.testhrottle.xd.util.Delete"/>

    <!-- 
    <beans:bean id="dataSource" 
    class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <beans:property name="driverClassName" value="org.postgresql.Driver"/>
        <beans:property name="url" value="jdbc:postgresql://localhost:5432/postgres"/>
        <beans:property name="username" value="postgres"/>
        <beans:property name="password" value="postgres"/>
    </beans:bean>
    <beans:bean name="configCache"
        class="com.testhrottle.xd.cache.ConfigCache">
        <beans:property name="jdbcTemplate" ref="jdbcTemplate" />
        <beans:property name="holdRetryIntervelInSeconds" value="10" />
        <beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay from dms_wb.alrt_type where alrt_type_cd = ?; 
        " />
    </beans:bean>

    <beans:bean name="cacheTest"
        class="com.testhrottle.xd.cache.CacheTest">
        <beans:property name="cache" ref="configCache" />
    </beans:bean> -->

    <beans:bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>


    <beans:bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager">
        <beans:property name="cacheManager" ref="ehcache" />
    </beans:bean>

    <beans:bean id="ehcache"
        class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean">
        <beans:property name="configLocation" value="classpath:ehcache.xml" />
        <beans:property name="shared" value="true" />
    </beans:bean>

    <beans:bean name="dailyLimitCounter" class="com.testhrottle.xd.throttle.DailyLimitCounter"/>

    <task:scheduler id="taskSchedulerInbound" pool-size="10" />

    <task:scheduled-tasks>
        <task:scheduled ref="dailyLimitCounter" method="clearAlertDailyCount" cron="59 59 23 * * *" />
    </task:scheduled-tasks>

</beans:beans>

所以如果我删除它它应该在同一个线程中工作。

<channel id="requeue">
   <dispatcher task-executor="taskExecutor" />
</channel>

<task:executor id="taskExecutor" pool-size="1" />

看起来你的 <dispatcher task-executor="taskExecutor" /> 有罪。

由于您跳出了消息总线线程,acks(提交)了 Broker 上的消息,但是节点的崩溃并没有影响(回滚),因为您的消息已移至不同线程。而且它还没有到达您的数据库以在那里提交。

一般来说,最好在 Spring XD 中使用容器 (XD) 线程执行所有操作并依赖消息总线持久机制。 或者手动将消息提交到数据库,但在同一个 XD 线程中。

有道理吗?