Spring 集成 DSL JmsMessageDrivenChannelAdapter 错误处理程序处理后无限重试

Spring Integration DSL JmsMessageDrivenChannelAdapter infinite retry after the error handler processing

背景:

我们在 Spring XML 配置中编写了一个 Spring 集成适配器,如下所示。它在所有场景 w.r.to 错误处理中都能完美运行。错误处理所做的所有事情就是将错误消息写入队列。现在我们需要将此 xml 配置转换为 DSL,我们已使用以下代码对其进行了更改。

问题:

每当 'inputChannel' 链中发生错误时,我们希望错误处理进行一些检查并将错误写入错误队列并且不要重试有效负载。 Spring XML 正在做真正需要做的事情,但是当我们在将错误消息放入错误队列后将其更改为 DSL 时,有效负载被写回输入队列并且队列中的错误消息消失并且这是一个永无止境的循环。

我们所做的分析:

错误消息写入错误队列后没有发生任何错误,DSL 适配器配置没有任何此类要处理的内容。

任何 help/direction 解决这个问题的人都非常感谢。

正在工作 Spring XML 适配器:

<int-jms:message-driven-channel-adapter
        channel="inputChannel" container="jmsContainer" extract-payload="true" />


<beans:bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <beans:property name="connectionFactory" ref="connectionFactory" />
    <beans:property name="destinationName" value="Queue.test" />
    <beans:property name="errorHandler" ref="errorHandler" />
</beans:bean> 

DSL 中有问题的适配器:

private JmsMessageDrivenChannelAdapter MessageDrivenChannelAdapter(
        String destinationName, String key) throws Exception {
    JmsMessageDrivenChannelAdapter channelAdapter = Jms
            .messageDriverChannelAdapter(connectionFactory)
            .outputChannel(inputChannel)
            .configureListenerContainer(
                    c -> c.errorHandler(errorHandler))
            .destination(destinationName)
            .setHeaderMapper(new HeaderMapper(getChannelHeaders(key)))
            .get();
    return channelAdapter;
}

有一些问题:

  1. 你没有展示你是如何使用它的 MessageDrivenChannelAdapter()

  2. 您应该共享 DEBUG 日志以演示消息应该如何传输以及如何不传输。

如果我是你,我会将 XML 转换为 Java DSL:

@Bean
public DefaultMessageListenerContainer jmsContainer() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(jmsConnectionFactory());
    container.setDestinationName("Queue.test");
    container.setErrorHandler(errorHandler);
    return container;
}

@Bean
public IntegrationFlow myJmsFlow() {
    return IntegrationFlows.from(
            Jms.messageDrivenChannelAdapter(jmsContainer())
                    .extractPayload(true))
            .channel(inputChannel)
            .get();
}

要点是 jmsContainer bean,因为它在您的 XML 配置中。

并注意我如何使用 Jms.messageDrivenChannelAdapter() - 来自 IntegrationFlows.from() 而没有 get() 调用。

如果你要使用那个MessageDrivenChannelAdapter()方法,它必须是public@Bean,否则IntegrationComponentSpec的所有内部结构都无法工作因为它们在 .get() 调用后丢失了。