Spring 集成 AMQP - 不顾重试建议不断重试
Spring Integration AMQP - retries continuously despite retry advice
我之前使用 Spring 与 JMS 的集成非常成功,但我们现在使用 RabbitMQ/AMQP 并且在错误处理方面遇到了一些问题。
我有一个带有 errorChannel 的 int-amqp:inbound-channel-adapter 设置为接收任何异常,这里 ErrorTransformer class 检查失败消息的原因异常。然后根据异常的 type :-
抑制异常并转换为 JSON 对象,该对象可以作为解释失败的业务回复转到 AMQP outbound-channel-adapter。这里我想要原始消息consumed/ACKed。
或者重新抛出引发异常,让 RabbitMQ 重新传递消息一定次数。
我发现重抛导致消息不断重投,后来看了StatefulRetryOperationsInterceptorFactoryBean,加了一个advice chain重试3次,然后又报no message-id异常,所以也加了a 'MissingMessageIdAdvice' 在建议链的开头。
尽管有建议,我仍然连续重试 一个从 errorChannel 的 ErrorTransformer 重新抛出的 RuntimeException。我仅使用默认值通过 RabbitMQ 管理员发布消息。不确定是否缺少 message-id 导致此功能无法正常工作,如果是这样,我如何获得具有 id 的消息?
我对 :-
之间的区别感到困惑
A) ConditionalRejectingErrorHandler(我已设置为入站适配器的错误处理程序),它允许我提供 customFatalExceptionStrategy 来实现 isFatal()。我相信 fatal=true (意味着丢弃)并且消息被消耗和丢弃,但我如何仍然发送出站失败消息?
B) 以及我在入站适配器上的 errorChannel,我用它来检查异常并转换为出站失败响应消息。在这里我想我可以抛出 AmqpRejectAndDontRequeueException,但是为什么还要抛出 ConditionalRejectingErrorHandler?并且会抛出 AmqpRejectAndDontRequeueException 工作
<int-amqp:inbound-channel-adapter id="amqpInRequestPatternValuation" channel="requestAmqpIn" channel-transacted="true" transaction-manager="transactionManager"
queue-names="requestQueue" error-channel="patternValuationErrorChannel" connection-factory="connectionFactory"
receive-timeout="59000" concurrent-consumers="1"
advice-chain="retryChain" error-handler="customErrorHandler" />
<bean id="customErrorHandler" class="org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler">
<constructor-arg ref="customFatalExceptionStrategy"/>
</bean>
<bean id="customFatalExceptionStrategy" class="abc.common.CustomFatalExceptionStrategy"/>
<!-- ADVICE CHAIN FOR CONTROLLING NUMBER OF RE-TRIES before sending to DLQ (or discarding if no DLQ) without this any re-queued fatal message will retry forever -->
<util:list id="retryChain">
<bean class="org.springframework.amqp.rabbit.retry.MissingMessageIdAdvice">
<constructor-arg>
<bean class="org.springframework.retry.policy.MapRetryContextCache" />
</constructor-arg>
</bean>
<ref bean="retryInterceptor" />
</util:list>
<bean id="retryInterceptor"
class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
<property name="retryOperations" ref="retryTemplate" />
<property name="messageRecoverer" ref="messageRecoverer"/>
</bean>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="retryPolicy" ref="simpleRetryPolicy" />
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="10000" />
</bean>
</property>
</bean>
<bean id="simpleRetryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
您必须使用RejectAndDontRequeueRecoverer
在重试结束时停止重新投递:
* MessageRecover that causes the listener container to reject
* the message without requeuing. This enables failed messages
* to be sent to a Dead Letter Exchange/Queue, if the broker is
* so configured.
是的,messageId
对于重试用例很重要。
如果您无法在发送过程中手动提供消息,您可以注入自定义 MessageKeyGenerator
策略来确定消息中的唯一键。
我一直没有时间发布解决方案,所以就在这里。
一旦我将重试建议链配置到 AMQP 入站通道适配器,它必须包含 RejectAndDontRequeueRecoverer 的 messageRecoverer(我相信这也是错误)。我遗漏的重点是确保发件人发送消息时包含 message_id。因此,如果通过 RabbitMQ 管理控制台发布,我需要包含预定义的 message_id 属性 并提供一个值。
使用 'MissingMessageIdAdvice' 没有帮助(所以我删除了),因为它会在每次重新传递消息时生成不同的 message_id 导致重试计数不递增每次交付都被认为与上次不同
我之前使用 Spring 与 JMS 的集成非常成功,但我们现在使用 RabbitMQ/AMQP 并且在错误处理方面遇到了一些问题。
我有一个带有 errorChannel 的 int-amqp:inbound-channel-adapter 设置为接收任何异常,这里 ErrorTransformer class 检查失败消息的原因异常。然后根据异常的 type :-
抑制异常并转换为 JSON 对象,该对象可以作为解释失败的业务回复转到 AMQP outbound-channel-adapter。这里我想要原始消息consumed/ACKed。
或者重新抛出引发异常,让 RabbitMQ 重新传递消息一定次数。
我发现重抛导致消息不断重投,后来看了StatefulRetryOperationsInterceptorFactoryBean,加了一个advice chain重试3次,然后又报no message-id异常,所以也加了a 'MissingMessageIdAdvice' 在建议链的开头。
尽管有建议,我仍然连续重试 一个从 errorChannel 的 ErrorTransformer 重新抛出的 RuntimeException。我仅使用默认值通过 RabbitMQ 管理员发布消息。不确定是否缺少 message-id 导致此功能无法正常工作,如果是这样,我如何获得具有 id 的消息? 我对 :-
之间的区别感到困惑A) ConditionalRejectingErrorHandler(我已设置为入站适配器的错误处理程序),它允许我提供 customFatalExceptionStrategy 来实现 isFatal()。我相信 fatal=true (意味着丢弃)并且消息被消耗和丢弃,但我如何仍然发送出站失败消息?
B) 以及我在入站适配器上的 errorChannel,我用它来检查异常并转换为出站失败响应消息。在这里我想我可以抛出 AmqpRejectAndDontRequeueException,但是为什么还要抛出 ConditionalRejectingErrorHandler?并且会抛出 AmqpRejectAndDontRequeueException 工作
<int-amqp:inbound-channel-adapter id="amqpInRequestPatternValuation" channel="requestAmqpIn" channel-transacted="true" transaction-manager="transactionManager"
queue-names="requestQueue" error-channel="patternValuationErrorChannel" connection-factory="connectionFactory"
receive-timeout="59000" concurrent-consumers="1"
advice-chain="retryChain" error-handler="customErrorHandler" />
<bean id="customErrorHandler" class="org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler">
<constructor-arg ref="customFatalExceptionStrategy"/>
</bean>
<bean id="customFatalExceptionStrategy" class="abc.common.CustomFatalExceptionStrategy"/>
<!-- ADVICE CHAIN FOR CONTROLLING NUMBER OF RE-TRIES before sending to DLQ (or discarding if no DLQ) without this any re-queued fatal message will retry forever -->
<util:list id="retryChain">
<bean class="org.springframework.amqp.rabbit.retry.MissingMessageIdAdvice">
<constructor-arg>
<bean class="org.springframework.retry.policy.MapRetryContextCache" />
</constructor-arg>
</bean>
<ref bean="retryInterceptor" />
</util:list>
<bean id="retryInterceptor"
class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
<property name="retryOperations" ref="retryTemplate" />
<property name="messageRecoverer" ref="messageRecoverer"/>
</bean>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="retryPolicy" ref="simpleRetryPolicy" />
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="10000" />
</bean>
</property>
</bean>
<bean id="simpleRetryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
您必须使用RejectAndDontRequeueRecoverer
在重试结束时停止重新投递:
* MessageRecover that causes the listener container to reject
* the message without requeuing. This enables failed messages
* to be sent to a Dead Letter Exchange/Queue, if the broker is
* so configured.
是的,messageId
对于重试用例很重要。
如果您无法在发送过程中手动提供消息,您可以注入自定义 MessageKeyGenerator
策略来确定消息中的唯一键。
我一直没有时间发布解决方案,所以就在这里。
一旦我将重试建议链配置到 AMQP 入站通道适配器,它必须包含 RejectAndDontRequeueRecoverer 的 messageRecoverer(我相信这也是错误)。我遗漏的重点是确保发件人发送消息时包含 message_id。因此,如果通过 RabbitMQ 管理控制台发布,我需要包含预定义的 message_id 属性 并提供一个值。
使用 'MissingMessageIdAdvice' 没有帮助(所以我删除了),因为它会在每次重新传递消息时生成不同的 message_id 导致重试计数不递增每次交付都被认为与上次不同