连接由 JDBC 存储在 Spring Intgegration DSL 中的两个队列通道

Connecting two Queue Channels backed by JDBC Store in Spring Intgegration DSL

我正在尝试连接两个队列通道(由 JDBC 商店提供支持)。

@Configuration
public class DemoIntegration {
    @Bean
    public IntegrationFlow flow(MessageChannel firstJDBCChannel,
                                MessageChannel secondJDBCChannel) {
        return IntegrationFlows.from(firstJDBCChannel)
                .bridge(bridgeHandler -> bridgeHandler.poller(p -> p.fixedDelay(100L)))
                .handle(secondJDBCChannel)
                .get();
    }
}

我尝试在这两个通道之间放置不同的结构,但仍然出现如下错误:

Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match: [public boolean org.springframework.integration.channel.AbstractPollableChannel.removeInterceptor(org.springframework.messaging.support.ChannelInterceptor), public org.springframework.messaging.Message org.springframework.integration.channel.AbstractPollableChannel.receive(long), public final void org.springframework.integration.context.IntegrationObjectSupport.setPrimaryExpression(org.springframework.expression.Expression), public void org.springframework.integration.channel.AbstractMessageChannel.setStatsEnabled(boolean), public void org.springframework.integration.channel.AbstractMessageChannel.setMessageConverter(org.springframework.messaging.converter.MessageConverter), public void org.springframework.integration.channel.AbstractMessageChannel.setDatatypes(java.lang.Class...), public void org.springframework.integration.channel.AbstractMessageChannel.configureMetrics(org.springframework.integration.support.management.AbstractMessageChannelMetrics), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public org.springframework.messaging.support.ChannelInterceptor org.springframework.integration.channel.AbstractPollableChannel.removeInterceptor(int), public void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory), public java.util.List> org.springframework.integration.channel.QueueChannel.purge(org.springframework.integration.core.MessageSelector), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory), public org.springframework.expression.Expression org.springframework.integration.context.IntegrationObjectSupport.getExpression(), public void org.springframework.integration.channel.AbstractPollableChannel.setInterceptors(java.util.List), public void org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver)] at org.springframework.util.Assert.isNull(Assert.java:155) ~[spring-core-5.0.1.RELEASE.jar:5.0.1.RELEASE] at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:776) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:379) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:225) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:220) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.handler.MethodInvokingMessageProcessor.(MethodInvokingMessageProcessor.java:60) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.handler.ServiceActivatingHandler.(ServiceActivatingHandler.java:38) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:924) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:904) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:891) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at name.karwowski.blazej.integrationdemo2.DemoIntegration.flow(DemoIntegration.java:16) ~[classes/:na] at name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3.CGLIB$flow[=13=]() ~[classes/:na] at name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3$$FastClassBySpringCGLIB$$ef9b4b0c.invoke() ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228) ~[spring-core-5.0.1.RELEASE.jar:5.0.1.RELEASE] at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:361) ~[spring-context-5.0.1.RELEASE.jar:5.0.1.RELEASE] at name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3.flow() ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151] at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:155) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE] ... 19 common frames omitted

如何正确连接两个(或多个)队列通道?我需要在它们之间进行一些处理,并允许应用程序在停止时保留消息。

完整日志和代码示例在 github:https://github.com/blazejkarwowski/integration-test

您必须使用 .channel(secondJDBCChannel) 而不是 .handle(secondJDBCChannel)。没有什么可处理的 - 它是介于两者之间的通道。

有关详细信息,请参阅 Reference Manual