连接由 JDBC 存储在 Spring Intgegration DSL 中的两个队列通道
Connecting two Queue Channels backed by JDBC Store in Spring Intgegration DSL
我正在尝试连接两个队列通道(由 JDBC 商店提供支持)。
@Configuration
public class DemoIntegration {
@Bean
public IntegrationFlow flow(MessageChannel firstJDBCChannel,
MessageChannel secondJDBCChannel) {
return IntegrationFlows.from(firstJDBCChannel)
.bridge(bridgeHandler -> bridgeHandler.poller(p -> p.fixedDelay(100L)))
.handle(secondJDBCChannel)
.get();
}
}
我尝试在这两个通道之间放置不同的结构,但仍然出现如下错误:
Caused by: java.lang.IllegalArgumentException: Found ambiguous
parameter type [class java.lang.Void] for method match: [public
boolean
org.springframework.integration.channel.AbstractPollableChannel.removeInterceptor(org.springframework.messaging.support.ChannelInterceptor),
public org.springframework.messaging.Message
org.springframework.integration.channel.AbstractPollableChannel.receive(long),
public final void
org.springframework.integration.context.IntegrationObjectSupport.setPrimaryExpression(org.springframework.expression.Expression),
public void
org.springframework.integration.channel.AbstractMessageChannel.setStatsEnabled(boolean),
public void
org.springframework.integration.channel.AbstractMessageChannel.setMessageConverter(org.springframework.messaging.converter.MessageConverter),
public void
org.springframework.integration.channel.AbstractMessageChannel.setDatatypes(java.lang.Class...),
public void
org.springframework.integration.channel.AbstractMessageChannel.configureMetrics(org.springframework.integration.support.management.AbstractMessageChannelMetrics),
public void
org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String),
public org.springframework.messaging.support.ChannelInterceptor
org.springframework.integration.channel.AbstractPollableChannel.removeInterceptor(int),
public void
org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory),
public java.util.List>
org.springframework.integration.channel.QueueChannel.purge(org.springframework.integration.core.MessageSelector),
public void
org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext)
throws org.springframework.beans.BeansException, public void
org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory),
public org.springframework.expression.Expression
org.springframework.integration.context.IntegrationObjectSupport.getExpression(),
public void
org.springframework.integration.channel.AbstractPollableChannel.setInterceptors(java.util.List),
public void
org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver)]
at org.springframework.util.Assert.isNull(Assert.java:155)
~[spring-core-5.0.1.RELEASE.jar:5.0.1.RELEASE] at
org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:776)
~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at
org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:379)
~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at
org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:225)
~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at
org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:220)
~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at
org.springframework.integration.handler.MethodInvokingMessageProcessor.(MethodInvokingMessageProcessor.java:60)
~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at
org.springframework.integration.handler.ServiceActivatingHandler.(ServiceActivatingHandler.java:38)
~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at
org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:924)
~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at
org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:904)
~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at
org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:891)
~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at
name.karwowski.blazej.integrationdemo2.DemoIntegration.flow(DemoIntegration.java:16)
~[classes/:na] at
name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3.CGLIB$flow[=13=]()
~[classes/:na] at
name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3$$FastClassBySpringCGLIB$$ef9b4b0c.invoke()
~[classes/:na] at
org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228)
~[spring-core-5.0.1.RELEASE.jar:5.0.1.RELEASE] at
org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:361)
~[spring-context-5.0.1.RELEASE.jar:5.0.1.RELEASE] at
name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3.flow()
~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[na:1.8.0_151] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[na:1.8.0_151] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498)
~[na:1.8.0_151] at
org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:155)
~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE] ... 19 common frames
omitted
如何正确连接两个(或多个)队列通道?我需要在它们之间进行一些处理,并允许应用程序在停止时保留消息。
完整日志和代码示例在 github:https://github.com/blazejkarwowski/integration-test
您必须使用 .channel(secondJDBCChannel)
而不是 .handle(secondJDBCChannel)
。没有什么可处理的 - 它是介于两者之间的通道。
有关详细信息,请参阅 Reference Manual 。
我正在尝试连接两个队列通道(由 JDBC 商店提供支持)。
@Configuration
public class DemoIntegration {
@Bean
public IntegrationFlow flow(MessageChannel firstJDBCChannel,
MessageChannel secondJDBCChannel) {
return IntegrationFlows.from(firstJDBCChannel)
.bridge(bridgeHandler -> bridgeHandler.poller(p -> p.fixedDelay(100L)))
.handle(secondJDBCChannel)
.get();
}
}
我尝试在这两个通道之间放置不同的结构,但仍然出现如下错误:
Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match: [public boolean org.springframework.integration.channel.AbstractPollableChannel.removeInterceptor(org.springframework.messaging.support.ChannelInterceptor), public org.springframework.messaging.Message org.springframework.integration.channel.AbstractPollableChannel.receive(long), public final void org.springframework.integration.context.IntegrationObjectSupport.setPrimaryExpression(org.springframework.expression.Expression), public void org.springframework.integration.channel.AbstractMessageChannel.setStatsEnabled(boolean), public void org.springframework.integration.channel.AbstractMessageChannel.setMessageConverter(org.springframework.messaging.converter.MessageConverter), public void org.springframework.integration.channel.AbstractMessageChannel.setDatatypes(java.lang.Class...), public void org.springframework.integration.channel.AbstractMessageChannel.configureMetrics(org.springframework.integration.support.management.AbstractMessageChannelMetrics), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public org.springframework.messaging.support.ChannelInterceptor org.springframework.integration.channel.AbstractPollableChannel.removeInterceptor(int), public void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory), public java.util.List> org.springframework.integration.channel.QueueChannel.purge(org.springframework.integration.core.MessageSelector), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory), public org.springframework.expression.Expression org.springframework.integration.context.IntegrationObjectSupport.getExpression(), public void org.springframework.integration.channel.AbstractPollableChannel.setInterceptors(java.util.List), public void org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver)] at org.springframework.util.Assert.isNull(Assert.java:155) ~[spring-core-5.0.1.RELEASE.jar:5.0.1.RELEASE] at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:776) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:379) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:225) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:220) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.handler.MethodInvokingMessageProcessor.(MethodInvokingMessageProcessor.java:60) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.handler.ServiceActivatingHandler.(ServiceActivatingHandler.java:38) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:924) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:904) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:891) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] at name.karwowski.blazej.integrationdemo2.DemoIntegration.flow(DemoIntegration.java:16) ~[classes/:na] at name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3.CGLIB$flow[=13=]() ~[classes/:na] at name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3$$FastClassBySpringCGLIB$$ef9b4b0c.invoke() ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228) ~[spring-core-5.0.1.RELEASE.jar:5.0.1.RELEASE] at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:361) ~[spring-context-5.0.1.RELEASE.jar:5.0.1.RELEASE] at name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3.flow() ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151] at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:155) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE] ... 19 common frames omitted
如何正确连接两个(或多个)队列通道?我需要在它们之间进行一些处理,并允许应用程序在停止时保留消息。
完整日志和代码示例在 github:https://github.com/blazejkarwowski/integration-test
您必须使用 .channel(secondJDBCChannel)
而不是 .handle(secondJDBCChannel)
。没有什么可处理的 - 它是介于两者之间的通道。
有关详细信息,请参阅 Reference Manual 。