EvaluationException:此命令处理器不支持方法 'start'
EvaluationException: The method 'start' is not supported by this command processor
我想使用 spring 集成开发控制总线示例。作为示例,我采用了以下示例:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/control-bus
我决定做同样的事情,但使用 java DSL。
我还阅读了以下主题: 但它对我没有帮助。
现在我有以下源代码:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("operationChannel")
.controlBus()
.get();
}
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> inboundAdapter() {
return new MessageSource<String>() {
@Override
public Message receive() {
return new Message() {
@Override
public String getPayload() {
return "some_output_message";
}
@Override
public MessageHeaders getHeaders() {
return null;
}
};
}
};
}
@Bean
public AbstractMessageChannel adapterOutputChanel() {
return new QueueChannel();
}
}
和应用:
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
ConfigurableApplicationContext ctx = new SpringApplication(MyApplication.class).run(args);
MessageChannel controlChannel = ctx.getBean("operationChannel", MessageChannel.class);
PollableChannel adapterOutputChanel = ctx.getBean("adapterOutputChanel", PollableChannel.class);
controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));
adapterOutputChanel.receive(1000);
}
}
但是当我启动应用程序时,我看到以下日志:
2019-08-26 16:09:30.901 INFO 10532 --- [ main] control_bus.MyApplication : Started MyApplication in 1.248 seconds (JVM running for 2.401)
Exception in thread "main" org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@7351a16e] (controlBusFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0)]; nested exception is org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute., failedMessage=GenericMessage [payload=@'inboundAdapter'.start(), headers={id=aef8f0dc-c3f5-5f7b-1a6d-7c9041f2d000, timestamp=1566824970903}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at control_bus.MyApplication.main(MyApplication.java:18)
Caused by: org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute.
at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.validateMethod(ExpressionCommandMessageProcessor.java:114)
at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.resolve(ExpressionCommandMessageProcessor.java:95)
at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:205)
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
at org.springframework.expression.spel.ast.MethodReference.access[=13=]0(MethodReference.java:54)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:390)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:90)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:114)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:365)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:151)
at org.springframework.integration.handler.ExpressionCommandMessageProcessor.processMessage(ExpressionCommandMessageProcessor.java:76)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
... 7 more
我做错了什么?
更新:
我向前做了几个步骤,我的配置如下所示:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("operationChannel")
.controlBus()
.get();
}
@Bean
public AbstractMessageChannel adapterOutputChanel() {
return new QueueChannel();
}
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
public MessageSource inboundAdapter() {
return new MyMessageSource();
}
public static class MyMessageSource implements MessageSource<String>, Lifecycle {
private volatile boolean started;
@Override
public Message receive() {
if (!isRunning()) {
return null;
}
return new Message() {
@Override
public String getPayload() {
return "some_output_message";
}
@Override
public MessageHeaders getHeaders() {
return null;
}
};
}
@Override
public void start() {
started = true;
}
@Override
public void stop() {
started = false;
}
@Override
public boolean isRunning() {
return started;
}
}
}
当我执行时:
controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));
MyMessageSource#start 被调用
stop
也是如此,但是 MyMessageSource#receive
没有被调用,所以
adapterOutputChanel.receive(1000)
总是returnsnull
解决方案:
在 Artem Bilan 回答后我有以下配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("operationChannel")
.controlBus()
.get();
}
@Bean
public AbstractMessageChannel adapterOutputChanel() {
return new QueueChannel();
}
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
@EndpointId("myInboundAdapter")
public MessageSource inboundAdapter() {
return new MyMessageSource();
}
public static class MyMessageSource implements MessageSource<String> {
@Override
public Message receive() {
return new Message() {
@Override
public String getPayload() {
return "some_output_message";
}
@Override
public MessageHeaders getHeaders() {
return new MessageHeaders(new HashMap());
}
@Override
public String toString() {
return getPayload() + ", " + getHeaders();
}
};
}
}
}
应用输出:
2019-08-26 17:20:54.087 INFO 11792 --- [ main] control_bus.MyApplication : Started MyApplication in 1.526 seconds (JVM running for 2.843)
Before start:null
2019-08-26 17:20:55.093 INFO 11792 --- [ main] o.s.i.e.SourcePollingChannelAdapter : started myInboundAdapter
After start:some_output_message, {id=857f4320-5158-6daa-8a03-3a0182436a78, timestamp=1566829255098}
2019-08-26 17:20:55.098 INFO 11792 --- [ main] o.s.i.e.SourcePollingChannelAdapter : stopped myInboundAdapter
After stop:null
根据 Java 和 Spring 中的注释配置,inboundAdapter
bean 名称(本质上是一个 bean 方法名称)被准确分配给您声明为 bean 的内容。在您的情况下,它是一个 MessageSource
实现。您确实需要在控制总线命令中处理通过 @InboundChannelAdapter
分配给 MessageSource
的 SourcePollingChannelAdapter
bean。唯一的问题是我们需要找出一个正确的 bean 名称以从命令中引用它:
The AbstractEndpoint bean name is generated with the following pattern: [configurationComponentName].[methodName].[decapitalizedAnnotationClassShortName]. For example, the SourcePollingChannelAdapter endpoint for the consoleSource() definition shown earlier gets a bean name of myFlowConfiguration.consoleSource.inboundChannelAdapter. See also Endpoint Bean Names.
因此,我建议您采用 Endpoint Bean Names 建议并使用 @EndpointId
以及 @InboundChannelAdapter
:
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
@EndpointId("myInboundAdapter")
public MessageSource<String> inboundAdapter() {
因此,您的控制总线命令将如下所示:"@myInboundAdapter.start()"
更新
MessageSource
线路的 Java DSL 变体:
@Bean
public IntegrationFlow channelAdapterFlow() {
return IntegrationFlows.from(new MyMessageSource(),
e -> e.id("myInboundAdapter").autoStartup(false).poller(p -> p.fixedDelay(100)))
.channel(adapterOutputChanel())
.get();
}
我想使用 spring 集成开发控制总线示例。作为示例,我采用了以下示例:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/control-bus
我决定做同样的事情,但使用 java DSL。
我还阅读了以下主题:
现在我有以下源代码:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("operationChannel")
.controlBus()
.get();
}
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> inboundAdapter() {
return new MessageSource<String>() {
@Override
public Message receive() {
return new Message() {
@Override
public String getPayload() {
return "some_output_message";
}
@Override
public MessageHeaders getHeaders() {
return null;
}
};
}
};
}
@Bean
public AbstractMessageChannel adapterOutputChanel() {
return new QueueChannel();
}
}
和应用:
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
ConfigurableApplicationContext ctx = new SpringApplication(MyApplication.class).run(args);
MessageChannel controlChannel = ctx.getBean("operationChannel", MessageChannel.class);
PollableChannel adapterOutputChanel = ctx.getBean("adapterOutputChanel", PollableChannel.class);
controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));
adapterOutputChanel.receive(1000);
}
}
但是当我启动应用程序时,我看到以下日志:
2019-08-26 16:09:30.901 INFO 10532 --- [ main] control_bus.MyApplication : Started MyApplication in 1.248 seconds (JVM running for 2.401)
Exception in thread "main" org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@7351a16e] (controlBusFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0)]; nested exception is org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute., failedMessage=GenericMessage [payload=@'inboundAdapter'.start(), headers={id=aef8f0dc-c3f5-5f7b-1a6d-7c9041f2d000, timestamp=1566824970903}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at control_bus.MyApplication.main(MyApplication.java:18)
Caused by: org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute.
at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.validateMethod(ExpressionCommandMessageProcessor.java:114)
at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.resolve(ExpressionCommandMessageProcessor.java:95)
at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:205)
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
at org.springframework.expression.spel.ast.MethodReference.access[=13=]0(MethodReference.java:54)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:390)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:90)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:114)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:365)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:151)
at org.springframework.integration.handler.ExpressionCommandMessageProcessor.processMessage(ExpressionCommandMessageProcessor.java:76)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
... 7 more
我做错了什么?
更新:
我向前做了几个步骤,我的配置如下所示:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("operationChannel")
.controlBus()
.get();
}
@Bean
public AbstractMessageChannel adapterOutputChanel() {
return new QueueChannel();
}
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
public MessageSource inboundAdapter() {
return new MyMessageSource();
}
public static class MyMessageSource implements MessageSource<String>, Lifecycle {
private volatile boolean started;
@Override
public Message receive() {
if (!isRunning()) {
return null;
}
return new Message() {
@Override
public String getPayload() {
return "some_output_message";
}
@Override
public MessageHeaders getHeaders() {
return null;
}
};
}
@Override
public void start() {
started = true;
}
@Override
public void stop() {
started = false;
}
@Override
public boolean isRunning() {
return started;
}
}
}
当我执行时:
controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));
MyMessageSource#start 被调用
stop
也是如此,但是 MyMessageSource#receive
没有被调用,所以
adapterOutputChanel.receive(1000)
总是returnsnull
解决方案:
在 Artem Bilan 回答后我有以下配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("operationChannel")
.controlBus()
.get();
}
@Bean
public AbstractMessageChannel adapterOutputChanel() {
return new QueueChannel();
}
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
@EndpointId("myInboundAdapter")
public MessageSource inboundAdapter() {
return new MyMessageSource();
}
public static class MyMessageSource implements MessageSource<String> {
@Override
public Message receive() {
return new Message() {
@Override
public String getPayload() {
return "some_output_message";
}
@Override
public MessageHeaders getHeaders() {
return new MessageHeaders(new HashMap());
}
@Override
public String toString() {
return getPayload() + ", " + getHeaders();
}
};
}
}
}
应用输出:
2019-08-26 17:20:54.087 INFO 11792 --- [ main] control_bus.MyApplication : Started MyApplication in 1.526 seconds (JVM running for 2.843)
Before start:null
2019-08-26 17:20:55.093 INFO 11792 --- [ main] o.s.i.e.SourcePollingChannelAdapter : started myInboundAdapter
After start:some_output_message, {id=857f4320-5158-6daa-8a03-3a0182436a78, timestamp=1566829255098}
2019-08-26 17:20:55.098 INFO 11792 --- [ main] o.s.i.e.SourcePollingChannelAdapter : stopped myInboundAdapter
After stop:null
根据 Java 和 Spring 中的注释配置,inboundAdapter
bean 名称(本质上是一个 bean 方法名称)被准确分配给您声明为 bean 的内容。在您的情况下,它是一个 MessageSource
实现。您确实需要在控制总线命令中处理通过 @InboundChannelAdapter
分配给 MessageSource
的 SourcePollingChannelAdapter
bean。唯一的问题是我们需要找出一个正确的 bean 名称以从命令中引用它:
The AbstractEndpoint bean name is generated with the following pattern: [configurationComponentName].[methodName].[decapitalizedAnnotationClassShortName]. For example, the SourcePollingChannelAdapter endpoint for the consoleSource() definition shown earlier gets a bean name of myFlowConfiguration.consoleSource.inboundChannelAdapter. See also Endpoint Bean Names.
因此,我建议您采用 Endpoint Bean Names 建议并使用 @EndpointId
以及 @InboundChannelAdapter
:
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
@EndpointId("myInboundAdapter")
public MessageSource<String> inboundAdapter() {
因此,您的控制总线命令将如下所示:"@myInboundAdapter.start()"
更新
MessageSource
线路的 Java DSL 变体:
@Bean
public IntegrationFlow channelAdapterFlow() {
return IntegrationFlows.from(new MyMessageSource(),
e -> e.id("myInboundAdapter").autoStartup(false).poller(p -> p.fixedDelay(100)))
.channel(adapterOutputChanel())
.get();
}