Spring XD,Reactor Streams:没有 XML 的配置?

Spring XD, Reactor Streams: configuration without XML?

http://docs.spring.io/spring-xd/docs/1.2.1.RELEASE/reference/html/#reactor-streams 上记录了如何在 Spring XD 中实现 Reactor Stream。根据此文档,我构建了自己的应用程序:

/src/main/java/mypackage/MyReactorStream.java:

...
import org.springframework.xd.reactor.Processor;
import reactor.rx.Stream;

public class MyReactorStream implements Processor<SomePOJO, SomePOJO> {

    private int intParam;
    public void setIntParam(int intParam) { ... }

    @Override
    public Stream<SomePOJO> process(Stream<SomePOJO> inputStream) { ... }

}

/src/main/resources/config/config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans ...>

    <bean id="messageProcessor" class="mypackage.MyReactorStream">
        <property name="intParam" value="${intParam}" />
    </bean>

    <!-- The rest is boilerplate that XD 1.1 RC1 will avoid you having to provide -->

    <int:channel id="input" />

    <bean name="messageHandler"
        class="org.springframework.xd.reactor.BroadcasterMessageHandler">
        <constructor-arg ref="messageProcessor" />
    </bean>

    <int:service-activator input-channel="input"
        ref="messageHandler" output-channel="output" />

    <int:channel id="output" />

</beans>

我想为我的 Java class 使用 @Configuration 和 Spring 注释,而不是 XML 配置。这怎么可能?

有关使用 Java 配置的自定义模块示例,请参阅 DSL and Tweet 示例。

以下作品。

/src/main/java/mypackage/ModuleConfiguration.java:

...
import javax.validation.constraints.NotNull;
import org.springframework.messaging.MessageHandler;
import org.springframework.xd.module.options.spi.ModuleOption;
import org.springframework.xd.reactor.BroadcasterMessageHandler;

@Configuration
@EnableIntegration
@ComponentScan(value = { "mypackage" })
public class ModuleConfiguration {

    private int intParam;

    @NotNull
    public int getIntParam() {
        return intParam;
    }

    @ModuleOption("help text that appears if you type 'module info ...' in the XD shell")
    public void setIntParam(int intParam) {
        this.intParam = intParam;
    }

    @Autowired
    private MyReactorStream myReactorStream;

    @Bean
    public MessageChannel input() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel output() {
        return new DirectChannel();
    }

    @Bean
    MyReactorStream myReactorStream() {
        return new myReactorStream();
    }

    @Bean
    @ServiceActivator(inputChannel = "input")
    MessageHandler messageHandler() {
        BroadcasterMessageHandler handler = new BroadcasterMessageHandler(myReactorStream);
        handler.setOutputChannel(output());
        return handler;
    }    
}

/src/main/java/mypackage/MyReactorStream.java:

...
import org.springframework.xd.reactor.Processor;
import reactor.rx.Stream;

public class MyReactorStream implements Processor<SomePOJO, SomePOJO> {

    @Value("${intParam}")
    private int intParam;

    @Override
    public Stream<SomePOJO> process(Stream<SomePOJO> inputStream) {
        return inputStream.map(pojo -> {
            ...
            pojo.setIntParam(intParam)
            return pojo;
        });
    }
}

/src/main/resources/config/spring-module.properties:

options_class = mypackage.ModuleConfiguration
base_packages=mypackage

不需要 XML 配置。