Spring 集成 JMS 使用 DSL 确保消息传递

Spring Integration JMS assured message delivery using DSL

我正在尝试创建一个流 (1),其中从可以是客户端或服务器的 TCP 适配器接收消息并将消息发送到 ActiveMQ 代理。

我的另一个流程(2) 从所需队列中挑选消息并发送到目的地

TCP(client/server) ==(1)==> ActiveMQ 代理 ==(2)==> HTTP 出站适配器

我想确保万一我的邮件没有送达所需的目的地,它会重新尝试再次发送邮件。

我目前流向经纪人的流量 (1) 是:

IntegrationFlow flow = IntegrationFlows
            .from(Tcp
                    .inboundAdapter(Tcp.netServer(Integer.parseInt(1234))
                            .serializer(customSerializer).deserializer(customSerializer)
                            .id("server").soTimeout(5000))
                    .id(hostConnection.getConnectionNumber() + "adapter"))).channel(directChannel())
            .wireTap("tcpInboundMessageLogChannel").channel(directChannel())
            .handle(Jms.outboundAdapter(activeMQConnectionFactory)
                    .destination("jmsInbound"))
            .get();

    this.flowContext.registration(flow).id("outflow").register();

我的流程(2) 从代理到 http 出站:

flow = IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
                    .destination("jmsInbound"))
            .channel(directChannel())
            .handle(Http.outboundChannelAdapter(hostConnection.getUrl()).httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class)
                    .mappedRequestHeaders("abc"))
            .get();
    this.flowContext.registration(flow).id("inflow").register();

问题:

所以,我想要没有消息丢失并且消息将按顺序处理的场景。

第一:我相信你可以为无限重试配置jmsInbound

/**
 * Configuration options for a messageConsumer used to control how messages are re-delivered when they
 * are rolled back.
 * May be used server side on a per destination basis via the Broker RedeliveryPlugin
 *
 * @org.apache.xbean.XBean element="redeliveryPolicy"
 *
 */
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {

另一方面,您可以为 RequestHandlerRetryAdvice 配置 .handle(Http.outboundChannelAdapter( 以实现类似的重试行为,但在应用程序内部无需往返 JMS 和返回:https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/messaging-endpoints-chapter.html#retry-advice

这里是一些如何从 Java DSL 角度进行配置的示例:

    @Bean
    public IntegrationFlow errorRecovererFlow() {
        return IntegrationFlows.from(Function.class, "errorRecovererFunction")
                .handle((GenericHandler<?>) (p, h) -> {
                    throw new RuntimeException("intentional");
                }, e -> e.advice(retryAdvice()))
                .get();
    }

    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
        return requestHandlerRetryAdvice;
    }

    @Bean
    public MessageChannel recoveryChannel() {
        return new DirectChannel();
    }

RequestHandlerRetryAdvice 可以配置 RetryTemplate 以应用类似 AlwaysRetryPolicy 的内容。有关详细信息,请参阅 Spring 重试项目:https://github.com/spring-projects/spring-retry