如何在出站->入站->httpgateway(此处消息失败)->外部服务的消息失败的情况下使用 RedeliveryPolicy
How to use the RedeliveryPolicy in case of failed messages for outbound->inbound->httpgateway(messages failing here)->EXTERNAL SERVICE
如何在出站->入站->httpoutboundgateway(消息在此处失败)->外部服务中出现失败消息时使用 RedeliveryPolicy
失败的消息不会根据 RedeliveryPolicy 进行重试,主要是因为在异常发生之前消息已经出队。
<int:channel id="jmsOutChannel" />
<jms:outbound-channel-adapter id="outboundJMSAdaptor" jms-template="jmsTemplate"
channel="jmsOutChannel"
destination-name="#{somebean.queueName}"/>
<int:channel id="jmsInChannel" />
<jms:message-driven-channel-adapter
channel="jmsInChannel" destination-name="#{somebean.queueName}"
connection-factory="jmsConnectionFactory" message-converter="jmsMessageConverter"/>
<int:header-enricher input-channel="jmsInChannel" output-channel="outbound_gateway_channel">
<int:header name="addressId" expression="payload.getId()"/>
<int:header name="Accept-Language" value="en_GB"/>
<int:header name="X-Source-CountryCode" value="GB"/>
<int:header name="X-Source-Operator" value="Enterprise"/>
<int:header name="X-Source-Division" value="CustomerManagement"/>
<int:header name="X-Source-System" value="${sapwebservices.http.header.source.system}"/>
<int:header name="X-Source-Timestamp" expression="new java.text.SimpleDateFormat('yyyy-MM-dd HH:mm:ss').format(new java.util.Date())"/>
<int:header name="Accept" value="application/json"/>
<int:header name="Content-Type" value="application/json;charset=UTF-8"/>
</int:header-enricher>
<bean id="httpRequestFactory" class="org.springframework.http.client.HttpComponentsClientHttpRequestFactory">
<property name="connectTimeout" value="${sapwebservices.http.rest.timeout}"/>
<property name="readTimeout" value="${sapwebservices.http.rest.timeout}"/>
<property name="httpClient" ref="httpClient"/>
</bean>
<int:object-to-json-transformer input-channel="outbound_gateway_channel"
output-channel="outbound_gateway_with_json"
object-mapper="nonNullObjectMapper"/>
<http:outbound-gateway mapped-request-headers="Accept*, Content-Type, X-*, HTTP_REQUEST_HEADERS"
request-channel="outbound_gateway_with_json"
reply-channel="print_payload"
url="${sapwebservices.ws.uri.updatecustomershippingaddress}"
http-method="PUT"
expected-response-type="java.lang.String"
charset="UTF-8"
request-factory="httpRequestFactory">
<http:uri-variable name="id" expression="headers['addressId']"/>
</http:outbound-gateway>
对于我的重新投递政策,
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="messageConverter" ref="jmsMessageConverter" />
<property name="connectionFactory">
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory">
<ref local="jmsConnectionFactory" />
</property>
</bean>
</property>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
<property name="redeliveryPolicy" ref="redeliveryPolicy"/>
<property name="nonBlockingRedelivery" value="true"/>
</bean>
<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="maximumRedeliveries" value="${sapwebservice.redeliveryPolicy.maximumRedeliveries}"/>
<property name="initialRedeliveryDelay" value="${sapwebservice.redeliveryPolicy.initialRedeliveryDelay}"/>
<property name="backOffMultiplier" value="${sapwebservice.redeliveryPolicy.backOffMultiplier}"/>
<property name="useExponentialBackOff" value="${sapwebservice.redeliveryPolicy.useExponentialBackOff}"/>
<property name="redeliveryDelay" value="${sapwebservice.redeliveryPolicy.redeliveryDelay}"/>
</bean>
<bean id="demoQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="demo.queue"/>
</bean>
<bean id="jmsMessageConverter" class="sap.converter.JMSMessageConverter"/>
<amq:broker useJmx="true" persistent="false">
<amq:destinationPolicy>
<amq:policyMap>
<amq:defaultEntry>
<amq:policyEntry queue=">">
<amq:deadLetterStrategy>
<amq:individualDeadLetterStrategy queuePrefix="dlq." useQueueForQueueMessages="true"/>
</amq:deadLetterStrategy>
</amq:policyEntry>
</amq:defaultEntry>
</amq:policyMap>
</amq:destinationPolicy>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>
预期结果是根据RedeliveryPolicy重试,如果所有重试都失败,最终会进入dlq。
在消息驱动的通道适配器上设置 acknowledge="transacted"
。
Spring Integration 4.2及以后版本(当前版本为5.1.7)默认设置为该值;对于早期版本,您必须在配置中进行设置,以便流程在事务中运行,并在抛出异常后回滚出队。
如何在出站->入站->httpoutboundgateway(消息在此处失败)->外部服务中出现失败消息时使用 RedeliveryPolicy
失败的消息不会根据 RedeliveryPolicy 进行重试,主要是因为在异常发生之前消息已经出队。
<int:channel id="jmsOutChannel" />
<jms:outbound-channel-adapter id="outboundJMSAdaptor" jms-template="jmsTemplate"
channel="jmsOutChannel"
destination-name="#{somebean.queueName}"/>
<int:channel id="jmsInChannel" />
<jms:message-driven-channel-adapter
channel="jmsInChannel" destination-name="#{somebean.queueName}"
connection-factory="jmsConnectionFactory" message-converter="jmsMessageConverter"/>
<int:header-enricher input-channel="jmsInChannel" output-channel="outbound_gateway_channel">
<int:header name="addressId" expression="payload.getId()"/>
<int:header name="Accept-Language" value="en_GB"/>
<int:header name="X-Source-CountryCode" value="GB"/>
<int:header name="X-Source-Operator" value="Enterprise"/>
<int:header name="X-Source-Division" value="CustomerManagement"/>
<int:header name="X-Source-System" value="${sapwebservices.http.header.source.system}"/>
<int:header name="X-Source-Timestamp" expression="new java.text.SimpleDateFormat('yyyy-MM-dd HH:mm:ss').format(new java.util.Date())"/>
<int:header name="Accept" value="application/json"/>
<int:header name="Content-Type" value="application/json;charset=UTF-8"/>
</int:header-enricher>
<bean id="httpRequestFactory" class="org.springframework.http.client.HttpComponentsClientHttpRequestFactory">
<property name="connectTimeout" value="${sapwebservices.http.rest.timeout}"/>
<property name="readTimeout" value="${sapwebservices.http.rest.timeout}"/>
<property name="httpClient" ref="httpClient"/>
</bean>
<int:object-to-json-transformer input-channel="outbound_gateway_channel"
output-channel="outbound_gateway_with_json"
object-mapper="nonNullObjectMapper"/>
<http:outbound-gateway mapped-request-headers="Accept*, Content-Type, X-*, HTTP_REQUEST_HEADERS"
request-channel="outbound_gateway_with_json"
reply-channel="print_payload"
url="${sapwebservices.ws.uri.updatecustomershippingaddress}"
http-method="PUT"
expected-response-type="java.lang.String"
charset="UTF-8"
request-factory="httpRequestFactory">
<http:uri-variable name="id" expression="headers['addressId']"/>
</http:outbound-gateway>
对于我的重新投递政策,
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="messageConverter" ref="jmsMessageConverter" />
<property name="connectionFactory">
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory">
<ref local="jmsConnectionFactory" />
</property>
</bean>
</property>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
<property name="redeliveryPolicy" ref="redeliveryPolicy"/>
<property name="nonBlockingRedelivery" value="true"/>
</bean>
<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="maximumRedeliveries" value="${sapwebservice.redeliveryPolicy.maximumRedeliveries}"/>
<property name="initialRedeliveryDelay" value="${sapwebservice.redeliveryPolicy.initialRedeliveryDelay}"/>
<property name="backOffMultiplier" value="${sapwebservice.redeliveryPolicy.backOffMultiplier}"/>
<property name="useExponentialBackOff" value="${sapwebservice.redeliveryPolicy.useExponentialBackOff}"/>
<property name="redeliveryDelay" value="${sapwebservice.redeliveryPolicy.redeliveryDelay}"/>
</bean>
<bean id="demoQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="demo.queue"/>
</bean>
<bean id="jmsMessageConverter" class="sap.converter.JMSMessageConverter"/>
<amq:broker useJmx="true" persistent="false">
<amq:destinationPolicy>
<amq:policyMap>
<amq:defaultEntry>
<amq:policyEntry queue=">">
<amq:deadLetterStrategy>
<amq:individualDeadLetterStrategy queuePrefix="dlq." useQueueForQueueMessages="true"/>
</amq:deadLetterStrategy>
</amq:policyEntry>
</amq:defaultEntry>
</amq:policyMap>
</amq:destinationPolicy>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>
预期结果是根据RedeliveryPolicy重试,如果所有重试都失败,最终会进入dlq。
在消息驱动的通道适配器上设置 acknowledge="transacted"
。
Spring Integration 4.2及以后版本(当前版本为5.1.7)默认设置为该值;对于早期版本,您必须在配置中进行设置,以便流程在事务中运行,并在抛出异常后回滚出队。