Spring XD,带有@Configuration 的 Reactor 流:NPE
Spring XD, Reactor Streams with @Configuration: NPE
我尝试在没有 XML 配置的情况下在 Spring XD 中实现 Reactor Stream:
MyProcessor.java:
...
import org.springframework.xd.reactor.Processor;
import reactor.rx.Stream;
public class MyProcessor implements Processor<MyPojo, MyPojo> {
@Override
public Stream<MyPojo> process(Stream<MyPojo> inputStream) {
return inputStream.map(pojo-> {
return pojo;
});
}
}
ModuleConfiguration.java:
...
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.xd.reactor.BroadcasterMessageHandler;
@Configuration
@EnableIntegration
@ComponentScan(value = { "myPackage" })
public class ModuleConfiguration {
@Autowired
private MyProcessor myProcessor;
@Bean
public MessageChannel input() {
return new DirectChannel();
}
@Bean
public MessageChannel output() {
return new DirectChannel();
}
@Bean
MyProcessor myProcessor() {
return new MyProcessor ();
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
MessageHandler messageHandler() {
return new BroadcasterMessageHandler(myProcessor);
}
}
此外 spring-module.properties
被管理。
启动 Spring XD 后,创建流并发送 MyPojo
我们得到异常
2015-09-08T14:50:42+0200 1.2.1.RELEASE ERROR xd-reactor-1 reactor.BroadcasterMessageHandler - Error processing stream [{push}]
java.lang.NullPointerException: null
at org.springframework.xd.reactor.AbstractReactorMessageHandler$ChannelForwardingSubscriber.onNext(AbstractReactorMessageHandler.java:142) ~[spring-xd-reactor-1.2.2.BUILD-SNAPSHOT.jar:1.2.2.BUILD-SNAPSHOT]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) ~[reactor-stream-2.0.4.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.4.RELEASE.jar:na]
at reactor.rx.action.transformation.MapAction.doNext(MapAction.java:39) [reactor-stream-2.0.4.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.4.RELEASE.jar:na]
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.4.RELEASE.jar:na]
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.4.RELEASE.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: reactor.core.support.Exceptions$ValueCause: Exception while signaling value: ...
我们可以做些什么来避免这个异常?
顺便说一句:如果我们使用 XML 配置而不是 @Configuration,它工作正常。
NPE
是 AbstractReactorMessageHandler
中的错误(它应该检测到空值 outputChannel
)。
根本原因是 outputChannel
必须注入处理程序。当 @ServiceActivator
用于 @Bean
时,outputChannel
属性将被忽略。
请参阅the NOTE in the documentation了解更多信息。
@Bean
@ServiceActivator(inputChannel = "input")
MessageHandler messageHandler() {
BroadcasterMessageHandler handler new BroadcasterMessageHandler(myProcessor);
handler.setOutputChannel(output());
return handler;
}
一个服务激活器包含 2 个 bean,一个消费者和一个消息处理程序;消息处理程序属性必须在处理程序 bean 本身上进行。
我创建了一个 JIRA Issue 让处理程序检测错误配置。
在 4.2 版本中,检测到注释上的 outputChannel
。
我尝试在没有 XML 配置的情况下在 Spring XD 中实现 Reactor Stream:
MyProcessor.java:
...
import org.springframework.xd.reactor.Processor;
import reactor.rx.Stream;
public class MyProcessor implements Processor<MyPojo, MyPojo> {
@Override
public Stream<MyPojo> process(Stream<MyPojo> inputStream) {
return inputStream.map(pojo-> {
return pojo;
});
}
}
ModuleConfiguration.java:
...
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.xd.reactor.BroadcasterMessageHandler;
@Configuration
@EnableIntegration
@ComponentScan(value = { "myPackage" })
public class ModuleConfiguration {
@Autowired
private MyProcessor myProcessor;
@Bean
public MessageChannel input() {
return new DirectChannel();
}
@Bean
public MessageChannel output() {
return new DirectChannel();
}
@Bean
MyProcessor myProcessor() {
return new MyProcessor ();
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
MessageHandler messageHandler() {
return new BroadcasterMessageHandler(myProcessor);
}
}
此外 spring-module.properties
被管理。
启动 Spring XD 后,创建流并发送 MyPojo
我们得到异常
2015-09-08T14:50:42+0200 1.2.1.RELEASE ERROR xd-reactor-1 reactor.BroadcasterMessageHandler - Error processing stream [{push}]
java.lang.NullPointerException: null
at org.springframework.xd.reactor.AbstractReactorMessageHandler$ChannelForwardingSubscriber.onNext(AbstractReactorMessageHandler.java:142) ~[spring-xd-reactor-1.2.2.BUILD-SNAPSHOT.jar:1.2.2.BUILD-SNAPSHOT]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) ~[reactor-stream-2.0.4.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.4.RELEASE.jar:na]
at reactor.rx.action.transformation.MapAction.doNext(MapAction.java:39) [reactor-stream-2.0.4.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.4.RELEASE.jar:na]
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.4.RELEASE.jar:na]
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.4.RELEASE.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: reactor.core.support.Exceptions$ValueCause: Exception while signaling value: ...
我们可以做些什么来避免这个异常?
顺便说一句:如果我们使用 XML 配置而不是 @Configuration,它工作正常。
NPE
是 AbstractReactorMessageHandler
中的错误(它应该检测到空值 outputChannel
)。
根本原因是 outputChannel
必须注入处理程序。当 @ServiceActivator
用于 @Bean
时,outputChannel
属性将被忽略。
请参阅the NOTE in the documentation了解更多信息。
@Bean
@ServiceActivator(inputChannel = "input")
MessageHandler messageHandler() {
BroadcasterMessageHandler handler new BroadcasterMessageHandler(myProcessor);
handler.setOutputChannel(output());
return handler;
}
一个服务激活器包含 2 个 bean,一个消费者和一个消息处理程序;消息处理程序属性必须在处理程序 bean 本身上进行。
我创建了一个 JIRA Issue 让处理程序检测错误配置。
在 4.2 版本中,检测到注释上的 outputChannel
。