错误 运行 Spring 集成 RSocket 示例与 Spring 引导
Error running Spring Integration RSocket sample with Spring Boot
我试图创建一个简单的 Spring 引导应用程序来测试 Spring Rsocket 的集成。
我将测试文件夹中的示例代码从 spring-integrarion-rsocket
复制到我的 Spring 启动应用程序。
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
和application.properties
:
spring.rsocket.server.transport=tcp
server.port=7000
主应用class.
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Configuration
class DemoIntegrationConfig {
@Bean
public ServerRSocketConnector serverRSocketConnector() {
return new ServerRSocketConnector("localhost", 0);
}
@Bean
public ClientRSocketConnector clientRSocketConnector(ServerRSocketConnector serverRSocketConnector) {
int port = serverRSocketConnector.getBoundPort().block();
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", port);
clientRSocketConnector.setAutoStartup(false);
return clientRSocketConnector;
}
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.command((message) -> RSocketOutboundGateway.Command.requestStreamOrChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector))
.get();
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase"))
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
和测试代码。
@SpringBootTest
class DemoApplicationTests {
@Autowired
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@Test
void testRsocketUpperCaseFlows() {
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a\n", "b\n", "c\n"));
StepVerifier.create(result)
.expectNext("A", "B", "C")
.verifyComplete();
}
}
当我 运行 测试时,它抛出异常:没有目标 '/uppercase' 的处理程序。
2019-10-30 12:59:28.617 INFO 2800 --- [ main] com.example.demo.DemoApplicationTests : Started DemoApplicationTests in 4.778 seconds (JVM running for 6.408)
org.springframework.messaging.MessageDeliveryException: No handler for destination '/uppercase'
at org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:309)
at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:445)
at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:417)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply(MessagingRSocket.java:173)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:4087)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstInner.onNext(FluxSwitchOnFirst.java:180)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:240)
at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:427)
at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
at io.rsocket.internal.RateLimitableRequestPublisher.subscribe(RateLimitableRequestPublisher.java:74)
at io.rsocket.RSocketResponder.handleStream(RSocketResponder.java:446)
at io.rsocket.RSocketResponder.handleChannel(RSocketResponder.java:502)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:315)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:329)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 6.961 s <<< FAILURE! - in com.example.demo.DemoApplicationTests
[ERROR] testRsocketUpperCaseFlows Time elapsed: 0.894 s <<< FAILURE!
源代码托管在 my Github。
顺便说一句。我必须 运行 这个应用程序作为一个嵌入式 RSocket web flux 应用程序(或者 运行 它作为一个独立的 RSocket 应用程序),如果我 运行在IntegrationConfig中配置了相同端口的RSocket服务器,我启动应用程序时出现端口绑定错误,看来ServerRSocketConnector
会创建自己的RSocket服务器。
更新:2/27/2020:
我尝试删除依赖项中的spring-boot-starter-rsocket并创建了一个RestController
来与网关通信
pom.xml.
的更新依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
并清理 application.properties.
中的配置
样本RestController
.
@RestController
class HelloController {
@Autowired()
@Lazy
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
}
}
当我运行应用程序时,我遇到了这样的异常。
org.springframework.messaging.MessageDeliveryException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]
at org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:389)
at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:476)
at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:444)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply(MessagingRSocket.java:173)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstInner.onNext(FluxSwitchOnFirst.java:180)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:240)
at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:427)
at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
at io.rsocket.internal.RateLimitableRequestPublisher.subscribe(RateLimitableRequestPublisher.java:74)
at io.rsocket.RSocketResponder.handleStream(RSocketResponder.java:446)
at io.rsocket.RSocketResponder.handleChannel(RSocketResponder.java:502)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:315)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.AssertionError: expectation "expectNext(A)" failed (expected: onNext(A); actual: onError(io.rsocket.exceptions.ApplicationErrorException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]))
at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
at reactor.test.DefaultStepVerifierBuilder.lambda$addExpectedValue(DefaultStepVerifierBuilder.java:501)
at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2211)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1483)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1431)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1091)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:129)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
at reactor.core.publisher.UnicastProcessor.checkTerminated(UnicastProcessor.java:334)
at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:232)
at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
at reactor.core.publisher.UnicastProcessor.onError(UnicastProcessor.java:401)
at io.rsocket.RSocketRequester.handleFrame(RSocketRequester.java:556)
at io.rsocket.RSocketRequester.handleIncomingFrames(RSocketRequester.java:516)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:308)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:422)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: io.rsocket.exceptions.ApplicationErrorException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45)
... 37 more
我刚刚使用您的所有代码从 https://start.spring.io 创建了一个新项目,它对我来说效果很好。我用Spring开机2.2.0
.
尽管我发现 RSocket 自动配置与我们拥有的 ServerRSocketConnector
之间没有关联。
我们需要以某种方式使 ServerRSocketConnector
基于我们在 Spring Boot 中自动配置的内容以支持 RSocket。欢迎在 Spring Boot 中就此事提出问题!然后我们将尝试弄清楚 and/or 在 Spring 集成中必须实施什么以及如何实施。
我试图创建一个简单的 Spring 引导应用程序来测试 Spring Rsocket 的集成。
我将测试文件夹中的示例代码从 spring-integrarion-rsocket
复制到我的 Spring 启动应用程序。
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
和application.properties
:
spring.rsocket.server.transport=tcp
server.port=7000
主应用class.
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Configuration
class DemoIntegrationConfig {
@Bean
public ServerRSocketConnector serverRSocketConnector() {
return new ServerRSocketConnector("localhost", 0);
}
@Bean
public ClientRSocketConnector clientRSocketConnector(ServerRSocketConnector serverRSocketConnector) {
int port = serverRSocketConnector.getBoundPort().block();
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", port);
clientRSocketConnector.setAutoStartup(false);
return clientRSocketConnector;
}
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.command((message) -> RSocketOutboundGateway.Command.requestStreamOrChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector))
.get();
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase"))
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
和测试代码。
@SpringBootTest
class DemoApplicationTests {
@Autowired
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@Test
void testRsocketUpperCaseFlows() {
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a\n", "b\n", "c\n"));
StepVerifier.create(result)
.expectNext("A", "B", "C")
.verifyComplete();
}
}
当我 运行 测试时,它抛出异常:没有目标 '/uppercase' 的处理程序。
2019-10-30 12:59:28.617 INFO 2800 --- [ main] com.example.demo.DemoApplicationTests : Started DemoApplicationTests in 4.778 seconds (JVM running for 6.408)
org.springframework.messaging.MessageDeliveryException: No handler for destination '/uppercase'
at org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:309)
at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:445)
at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:417)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply(MessagingRSocket.java:173)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:4087)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstInner.onNext(FluxSwitchOnFirst.java:180)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:240)
at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:427)
at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
at io.rsocket.internal.RateLimitableRequestPublisher.subscribe(RateLimitableRequestPublisher.java:74)
at io.rsocket.RSocketResponder.handleStream(RSocketResponder.java:446)
at io.rsocket.RSocketResponder.handleChannel(RSocketResponder.java:502)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:315)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:329)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 6.961 s <<< FAILURE! - in com.example.demo.DemoApplicationTests
[ERROR] testRsocketUpperCaseFlows Time elapsed: 0.894 s <<< FAILURE!
源代码托管在 my Github。
顺便说一句。我必须 运行 这个应用程序作为一个嵌入式 RSocket web flux 应用程序(或者 运行 它作为一个独立的 RSocket 应用程序),如果我 运行在IntegrationConfig中配置了相同端口的RSocket服务器,我启动应用程序时出现端口绑定错误,看来ServerRSocketConnector
会创建自己的RSocket服务器。
更新:2/27/2020:
我尝试删除依赖项中的spring-boot-starter-rsocket并创建了一个RestController
来与网关通信
pom.xml.
的更新依赖 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
并清理 application.properties.
中的配置样本RestController
.
@RestController
class HelloController {
@Autowired()
@Lazy
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
}
}
当我运行应用程序时,我遇到了这样的异常。
org.springframework.messaging.MessageDeliveryException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]
at org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:389)
at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:476)
at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:444)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply(MessagingRSocket.java:173)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstInner.onNext(FluxSwitchOnFirst.java:180)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:240)
at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:427)
at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
at io.rsocket.internal.RateLimitableRequestPublisher.subscribe(RateLimitableRequestPublisher.java:74)
at io.rsocket.RSocketResponder.handleStream(RSocketResponder.java:446)
at io.rsocket.RSocketResponder.handleChannel(RSocketResponder.java:502)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:315)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.AssertionError: expectation "expectNext(A)" failed (expected: onNext(A); actual: onError(io.rsocket.exceptions.ApplicationErrorException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]))
at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
at reactor.test.DefaultStepVerifierBuilder.lambda$addExpectedValue(DefaultStepVerifierBuilder.java:501)
at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2211)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1483)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1431)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1091)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:129)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
at reactor.core.publisher.UnicastProcessor.checkTerminated(UnicastProcessor.java:334)
at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:232)
at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
at reactor.core.publisher.UnicastProcessor.onError(UnicastProcessor.java:401)
at io.rsocket.RSocketRequester.handleFrame(RSocketRequester.java:556)
at io.rsocket.RSocketRequester.handleIncomingFrames(RSocketRequester.java:516)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:308)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:422)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: io.rsocket.exceptions.ApplicationErrorException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45)
... 37 more
我刚刚使用您的所有代码从 https://start.spring.io 创建了一个新项目,它对我来说效果很好。我用Spring开机2.2.0
.
尽管我发现 RSocket 自动配置与我们拥有的 ServerRSocketConnector
之间没有关联。
我们需要以某种方式使 ServerRSocketConnector
基于我们在 Spring Boot 中自动配置的内容以支持 RSocket。欢迎在 Spring Boot 中就此事提出问题!然后我们将尝试弄清楚 and/or 在 Spring 集成中必须实施什么以及如何实施。