Spring 集成 - 异常和重试

Spring Integration - Exceptions and retries

我的设置:
我有一个使用 ActiveMQ 的消息守护进程,它将消耗 JSON 条消息。
JSON 消息的发布者添加 type header 值,例如 com.example.Foo,这是 json 消息的类型。我用它来将 json 转换为 pojo.

Spring 配置:
收到消息后,这些是它经过的步骤:
1. Transformer:将json转换为pojo
2. Payload type router:根据pojo的类型,将pojo路由到合适的服务激活器。
3.服务激活器:处理消息。

<int:chain input-channel="transformerChannel">
    <int:transformer id="jsonToPojoTransformer" ref="JsonToPojoTransformer" method="transform" />

    <int:payload-type-router default-output-channel="defaultChannel">
        <int:mapping type="com.example.Foo" channel="fooHandlerChannel"/>
        <int:mapping type="com.example.Bar" channel="barHandlerChannel"/>
    </int:payload-type-router>
</int:chain>

<int:service-activator input-channel="fooHandlerChannel" ref="fooHandler" method="onMessage"/>
<int:service-activator input-channel="barHandlerChannel" ref="barHandler" method="onMessage"/>

服务激活器定义:

public class FooHandler {
    public void onMessage(Foo foo) {...}
}

问题:

  1. 我想知道如何访问服务激活器中的消息 header。服务激活器似乎无法访问消息 headers,因为转换器正在返回一个 pojo。

  2. 假设服务激活器出于任何原因无法调用下游休息服务。我现在想跳过处理这条消息,我想稍后重试这条消息。或者假设在处理此消息时出现异常。我想在延迟一段时间后重试处理此消息。我该如何完成?

--编辑--
根据 Artem 的评论删除了细节以减少问题大小。

请尽量不要在 SO 中制作这么长的主题。如果有很多问题,就很难回答特定的问题。

完全不清楚为什么您无法通过服务激活器方法访问 header。您可以接受整个 Message<>,并调用其 getHeaders(). You can use@Headersannotation on theMaparg to get headers from the message. You can use@Header` 注释来准确提取特定 header 来自消息。

即使您的转换器方法 returns 只是一个 POJO,这并不意味着它没有用来自 requestMessage 的 headers 包装到 Message。如果您需要 return 特定的 header 以及您的 POJO,您应该使用 MessageBuilder 自己创建 Message 并且不要忘记复制 requestMessage headers , 只是因为如果整个消息是 returned.

transformer 不会复制请求 headers

您必须在 JMS 消费者上支持 TX,这样 RuntimeException 将导致回滚,并因此最终重新传送。并且您应该确保所有流程都在同一个线程中执行。否则 TX 被提交并且消息在代理上被确认。当您没有交易时也会发生同样的情况。