EvaluationException:此命令处理器不支持方法 'start'

EvaluationException: The method 'start' is not supported by this command processor

我想使用 spring 集成开发控制总线示例。作为示例,我采用了以下示例:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/control-bus

我决定做同样的事情,但使用 java DSL。

我还阅读了以下主题: 但它对我没有帮助。

现在我有以下源代码:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {

    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from("operationChannel")
                .controlBus()
                .get();
    }

    @Bean
    @InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<String> inboundAdapter() {
        return new MessageSource<String>() {

            @Override
            public Message receive() {
                return new Message() {
                    @Override
                    public String getPayload() {
                        return "some_output_message";
                    }

                    @Override
                    public MessageHeaders getHeaders() {
                        return null;
                    }
                };
            }
        };
    }

    @Bean
    public AbstractMessageChannel adapterOutputChanel() {
        return new QueueChannel();
    }

}

应用:

@SpringBootApplication
public class MyApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = new SpringApplication(MyApplication.class).run(args);
        MessageChannel controlChannel = ctx.getBean("operationChannel", MessageChannel.class);
        PollableChannel adapterOutputChanel = ctx.getBean("adapterOutputChanel", PollableChannel.class);
        controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));
        adapterOutputChanel.receive(1000);
    }
}

但是当我启动应用程序时,我看到以下日志:

2019-08-26 16:09:30.901  INFO 10532 --- [           main] control_bus.MyApplication                : Started MyApplication in 1.248 seconds (JVM running for 2.401)
Exception in thread "main" org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@7351a16e] (controlBusFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0)]; nested exception is org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute., failedMessage=GenericMessage [payload=@'inboundAdapter'.start(), headers={id=aef8f0dc-c3f5-5f7b-1a6d-7c9041f2d000, timestamp=1566824970903}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at control_bus.MyApplication.main(MyApplication.java:18)
Caused by: org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute.
    at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.validateMethod(ExpressionCommandMessageProcessor.java:114)
    at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.resolve(ExpressionCommandMessageProcessor.java:95)
    at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:205)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
    at org.springframework.expression.spel.ast.MethodReference.access[=13=]0(MethodReference.java:54)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:390)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:90)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:114)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:365)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:151)
    at org.springframework.integration.handler.ExpressionCommandMessageProcessor.processMessage(ExpressionCommandMessageProcessor.java:76)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    ... 7 more

我做错了什么?

更新:

我向前做了几个步骤,我的配置如下所示:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {

    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from("operationChannel")
                .controlBus()
                .get();
    }

    @Bean
    public AbstractMessageChannel adapterOutputChanel() {
        return new QueueChannel();
    }

    @Bean
    @InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
    public MessageSource inboundAdapter() {
        return new MyMessageSource();
    }


    public static class MyMessageSource implements MessageSource<String>, Lifecycle {
        private volatile boolean started;

        @Override
        public Message receive() {
            if (!isRunning()) {
                return null;
            }
            return new Message() {
                @Override
                public String getPayload() {
                    return "some_output_message";
                }

                @Override
                public MessageHeaders getHeaders() {
                    return null;
                }

            };
        }

        @Override
        public void start() {
            started = true;
        }

        @Override
        public void stop() {
            started = false;
        }

        @Override
        public boolean isRunning() {
            return started;
        }
    }
}

当我执行时:

    controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));

MyMessageSource#start 被调用

stop 也是如此,但是 MyMessageSource#receive 没有被调用,所以

adapterOutputChanel.receive(1000)

总是returnsnull

解决方案:

在 Artem Bilan 回答后我有以下配置:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from("operationChannel")
                .controlBus()
                .get();
    }
    @Bean
    public AbstractMessageChannel adapterOutputChanel() {
        return new QueueChannel();
    }
    @Bean
    @InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
    @EndpointId("myInboundAdapter")
    public MessageSource inboundAdapter() {
        return new MyMessageSource();
    }
    public static class MyMessageSource implements MessageSource<String> {
        @Override
        public Message receive() {
            return new Message() {
                @Override
                public String getPayload() {
                    return "some_output_message";
                }
                @Override
                public MessageHeaders getHeaders() {
                    return new MessageHeaders(new HashMap());
                }
                @Override
                public String toString() {
                    return getPayload() + ", " + getHeaders();
                }
            };
        }
    }
}

应用输出:

2019-08-26 17:20:54.087  INFO 11792 --- [           main] control_bus.MyApplication                : Started MyApplication in 1.526 seconds (JVM running for 2.843)
Before start:null
2019-08-26 17:20:55.093  INFO 11792 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : started myInboundAdapter
After start:some_output_message, {id=857f4320-5158-6daa-8a03-3a0182436a78, timestamp=1566829255098}
2019-08-26 17:20:55.098  INFO 11792 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : stopped myInboundAdapter
After stop:null

根据 Java 和 Spring 中的注释配置,inboundAdapter bean 名称(本质上是一个 bean 方法名称)被准确分配给您声明为 bean 的内容。在您的情况下,它是一个 MessageSource 实现。您确实需要在控制总线命令中处理通过 @InboundChannelAdapter 分配给 MessageSourceSourcePollingChannelAdapter bean。唯一的问题是我们需要找出一个正确的 bean 名称以从命令中引用它:

The AbstractEndpoint bean name is generated with the following pattern: [configurationComponentName].[methodName].[decapitalizedAnnotationClassShortName]. For example, the SourcePollingChannelAdapter endpoint for the consoleSource() definition shown earlier gets a bean name of myFlowConfiguration.consoleSource.inboundChannelAdapter. See also Endpoint Bean Names.

来自此处的文档:https://docs.spring.io/spring-integration/docs/5.2.0.BUILD-SNAPSHOT/reference/html/configuration.html#annotations_on_beans

因此,我建议您采用 Endpoint Bean Names 建议并使用 @EndpointId 以及 @InboundChannelAdapter:

@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
@EndpointId("myInboundAdapter")
public MessageSource<String> inboundAdapter() {

因此,您的控制总线命令将如下所示:"@myInboundAdapter.start()"

更新

MessageSource 线路的 Java DSL 变体:

@Bean
public IntegrationFlow channelAdapterFlow() {
    return IntegrationFlows.from(new MyMessageSource(), 
                e -> e.id("myInboundAdapter").autoStartup(false).poller(p -> p.fixedDelay(100)))
            .channel(adapterOutputChanel())
            .get();
}