Spring XD,带有@Configuration 的 Reactor 流:NPE

Spring XD, Reactor Streams with @Configuration: NPE

我尝试在没有 XML 配置的情况下在 Spring XD 中实现 Reactor Stream:

MyProcessor.java:

...
import org.springframework.xd.reactor.Processor;
import reactor.rx.Stream;

public class MyProcessor implements Processor<MyPojo, MyPojo> {

    @Override
    public Stream<MyPojo> process(Stream<MyPojo> inputStream) {
        return inputStream.map(pojo-> {
            return pojo;
        });
    }

}

ModuleConfiguration.java:

...
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.xd.reactor.BroadcasterMessageHandler;

@Configuration
@EnableIntegration
@ComponentScan(value = { "myPackage" })
public class ModuleConfiguration {

    @Autowired
    private MyProcessor myProcessor;

    @Bean
    public MessageChannel input() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel output() {
        return new DirectChannel();
    }

    @Bean
    MyProcessor myProcessor() {
        return new MyProcessor ();
    }

    @Bean
    @ServiceActivator(inputChannel = "input", outputChannel = "output")
    MessageHandler messageHandler() {
        return new BroadcasterMessageHandler(myProcessor);
    }

}

此外 spring-module.properties 被管理。

启动 Spring XD 后,创建流并发送 MyPojo 我们得到异常

2015-09-08T14:50:42+0200 1.2.1.RELEASE ERROR xd-reactor-1 reactor.BroadcasterMessageHandler - Error processing stream [{push}]
java.lang.NullPointerException: null
        at org.springframework.xd.reactor.AbstractReactorMessageHandler$ChannelForwardingSubscriber.onNext(AbstractReactorMessageHandler.java:142) ~[spring-xd-reactor-1.2.2.BUILD-SNAPSHOT.jar:1.2.2.BUILD-SNAPSHOT]
        at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) ~[reactor-stream-2.0.4.RELEASE.jar:na]
        at reactor.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.4.RELEASE.jar:na]
        at reactor.rx.action.transformation.MapAction.doNext(MapAction.java:39) [reactor-stream-2.0.4.RELEASE.jar:na]
        at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.4.RELEASE.jar:na]
        at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.4.RELEASE.jar:na]
        at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.4.RELEASE.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: reactor.core.support.Exceptions$ValueCause: Exception while signaling value: ...

我们可以做些什么来避免这个异常?
顺便说一句:如果我们使用 XML 配置而不是 @Configuration,它工作正常。

NPEAbstractReactorMessageHandler 中的错误(它应该检测到空值 outputChannel)。

根本原因是 outputChannel 必须注入处理程序。当 @ServiceActivator 用于 @Bean 时,outputChannel 属性将被忽略。

请参阅the NOTE in the documentation了解更多信息。

@Bean
@ServiceActivator(inputChannel = "input")
MessageHandler messageHandler() {
    BroadcasterMessageHandler handler new BroadcasterMessageHandler(myProcessor);
    handler.setOutputChannel(output());
    return handler;
}

一个服务激活器包含 2 个 bean,一个消费者和一个消息处理程序;消息处理程序属性必须在处理程序 bean 本身上进行。

我创建了一个 JIRA Issue 让处理程序检测错误配置。

在 4.2 版本中,检测到注释上的 outputChannel