Spring 集成 - 异常和重试
Spring Integration - Exceptions and retries
我的设置:
我有一个使用 ActiveMQ 的消息守护进程,它将消耗 JSON 条消息。
JSON 消息的发布者添加 type
header 值,例如 com.example.Foo
,这是 json 消息的类型。我用它来将 json 转换为 pojo.
Spring 配置:
收到消息后,这些是它经过的步骤:
1. Transformer:将json转换为pojo
2. Payload type router:根据pojo的类型,将pojo路由到合适的服务激活器。
3.服务激活器:处理消息。
<int:chain input-channel="transformerChannel">
<int:transformer id="jsonToPojoTransformer" ref="JsonToPojoTransformer" method="transform" />
<int:payload-type-router default-output-channel="defaultChannel">
<int:mapping type="com.example.Foo" channel="fooHandlerChannel"/>
<int:mapping type="com.example.Bar" channel="barHandlerChannel"/>
</int:payload-type-router>
</int:chain>
<int:service-activator input-channel="fooHandlerChannel" ref="fooHandler" method="onMessage"/>
<int:service-activator input-channel="barHandlerChannel" ref="barHandler" method="onMessage"/>
服务激活器定义:
public class FooHandler {
public void onMessage(Foo foo) {...}
}
问题:
我想知道如何访问服务激活器中的消息 header。服务激活器似乎无法访问消息 headers,因为转换器正在返回一个 pojo。
假设服务激活器出于任何原因无法调用下游休息服务。我现在想跳过处理这条消息,我想稍后重试这条消息。或者假设在处理此消息时出现异常。我想在延迟一段时间后重试处理此消息。我该如何完成?
--编辑--
根据 Artem 的评论删除了细节以减少问题大小。
请尽量不要在 SO 中制作这么长的主题。如果有很多问题,就很难回答特定的问题。
完全不清楚为什么您无法通过服务激活器方法访问 header。您可以接受整个 Message<>
,并调用其 getHeaders(). You can use
@Headersannotation on the
Maparg to get headers from the message. You can use
@Header` 注释来准确提取特定 header 来自消息。
即使您的转换器方法 returns 只是一个 POJO,这并不意味着它没有用来自 requestMessage 的 headers 包装到 Message
。如果您需要 return 特定的 header 以及您的 POJO,您应该使用 MessageBuilder
自己创建 Message
并且不要忘记复制 requestMessage headers , 只是因为如果整个消息是 returned.
transformer 不会复制请求 headers
您必须在 JMS 消费者上支持 TX,这样 RuntimeException
将导致回滚,并因此最终重新传送。并且您应该确保所有流程都在同一个线程中执行。否则 TX 被提交并且消息在代理上被确认。当您没有交易时也会发生同样的情况。
我的设置:
我有一个使用 ActiveMQ 的消息守护进程,它将消耗 JSON 条消息。
JSON 消息的发布者添加 type
header 值,例如 com.example.Foo
,这是 json 消息的类型。我用它来将 json 转换为 pojo.
Spring 配置:
收到消息后,这些是它经过的步骤:
1. Transformer:将json转换为pojo
2. Payload type router:根据pojo的类型,将pojo路由到合适的服务激活器。
3.服务激活器:处理消息。
<int:chain input-channel="transformerChannel">
<int:transformer id="jsonToPojoTransformer" ref="JsonToPojoTransformer" method="transform" />
<int:payload-type-router default-output-channel="defaultChannel">
<int:mapping type="com.example.Foo" channel="fooHandlerChannel"/>
<int:mapping type="com.example.Bar" channel="barHandlerChannel"/>
</int:payload-type-router>
</int:chain>
<int:service-activator input-channel="fooHandlerChannel" ref="fooHandler" method="onMessage"/>
<int:service-activator input-channel="barHandlerChannel" ref="barHandler" method="onMessage"/>
服务激活器定义:
public class FooHandler {
public void onMessage(Foo foo) {...}
}
问题:
我想知道如何访问服务激活器中的消息 header。服务激活器似乎无法访问消息 headers,因为转换器正在返回一个 pojo。
假设服务激活器出于任何原因无法调用下游休息服务。我现在想跳过处理这条消息,我想稍后重试这条消息。或者假设在处理此消息时出现异常。我想在延迟一段时间后重试处理此消息。我该如何完成?
--编辑--
根据 Artem 的评论删除了细节以减少问题大小。
请尽量不要在 SO 中制作这么长的主题。如果有很多问题,就很难回答特定的问题。
完全不清楚为什么您无法通过服务激活器方法访问 header。您可以接受整个 Message<>
,并调用其 getHeaders(). You can use
@Headersannotation on the
Maparg to get headers from the message. You can use
@Header` 注释来准确提取特定 header 来自消息。
即使您的转换器方法 returns 只是一个 POJO,这并不意味着它没有用来自 requestMessage 的 headers 包装到 Message
。如果您需要 return 特定的 header 以及您的 POJO,您应该使用 MessageBuilder
自己创建 Message
并且不要忘记复制 requestMessage headers , 只是因为如果整个消息是 returned.
您必须在 JMS 消费者上支持 TX,这样 RuntimeException
将导致回滚,并因此最终重新传送。并且您应该确保所有流程都在同一个线程中执行。否则 TX 被提交并且消息在代理上被确认。当您没有交易时也会发生同样的情况。