错误 运行 Spring 集成 RSocket 示例与 Spring 引导

Error running Spring Integration RSocket sample with Spring Boot

我试图创建一个简单的 Spring 引导应用程序来测试 Spring Rsocket 的集成。

我将测试文件夹中的示例代码从 spring-integrarion-rsocket 复制到我的 Spring 启动应用程序。

pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

application.properties

spring.rsocket.server.transport=tcp
server.port=7000

主应用class.

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@Configuration
class DemoIntegrationConfig {

    @Bean
    public ServerRSocketConnector serverRSocketConnector() {
        return new ServerRSocketConnector("localhost", 0);
    }

    @Bean
    public ClientRSocketConnector clientRSocketConnector(ServerRSocketConnector serverRSocketConnector) {
        int port = serverRSocketConnector.getBoundPort().block();
        ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", port);
        clientRSocketConnector.setAutoStartup(false);
        return clientRSocketConnector;
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
        return IntegrationFlows
                .from(Function.class)
                .handle(RSockets.outboundGateway("/uppercase")
                        .command((message) -> RSocketOutboundGateway.Command.requestStreamOrChannel)
                        .expectedResponseType("T(java.lang.String)")
                        .clientRSocketConnector(clientRSocketConnector))
                .get();
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow() {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase"))
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

和测试代码。

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @Test
    void testRsocketUpperCaseFlows() {
        Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a\n", "b\n", "c\n"));

        StepVerifier.create(result)
                .expectNext("A", "B", "C")
                .verifyComplete();
    }
}

当我 运行 测试时,它抛出异常:没有目标 '/uppercase' 的处理程序。

2019-10-30 12:59:28.617  INFO 2800 --- [           main] com.example.demo.DemoApplicationTests    : Started DemoApplicationTests in 4.778 seconds (JVM running for 6.408)
org.springframework.messaging.MessageDeliveryException: No handler for destination '/uppercase'
        at org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:309)
        at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:445)
        at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:417)
        at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply(MessagingRSocket.java:173)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4087)
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
        at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
        at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstInner.onNext(FluxSwitchOnFirst.java:180)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:240)
        at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
        at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:427)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
        at io.rsocket.internal.RateLimitableRequestPublisher.subscribe(RateLimitableRequestPublisher.java:74)
        at io.rsocket.RSocketResponder.handleStream(RSocketResponder.java:446)
        at io.rsocket.RSocketResponder.handleChannel(RSocketResponder.java:502)
        at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:315)
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
        at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
        at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
        at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
        at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
        at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116)
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
        at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
        at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
        at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:329)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:1044)
        at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 6.961 s <<< FAILURE! - in com.example.demo.DemoApplicationTests
[ERROR] testRsocketUpperCaseFlows  Time elapsed: 0.894 s  <<< FAILURE!

源代码托管在 my Github

顺便说一句。我必须 运行 这个应用程序作为一个嵌入式 RSocket web flux 应用程序(或者 运行 它作为一个独立的 RSocket 应用程序),如果我 运行在IntegrationConfig中配置了相同端口的RSocket服务器,我启动应用程序时出现端口绑定错误,看来ServerRSocketConnector会创建自己的RSocket服务器。

更新:2/27/2020

我尝试删除依赖项中的spring-boot-starter-rsocket并创建了一个RestController来与网关通信

pom.xml.

的更新依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

并清理 application.properties.

中的配置

样本RestController.


@RestController
class HelloController {

    @Autowired()
    @Lazy
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
    }
}

当我运行应用程序时,我遇到了这样的异常。

org.springframework.messaging.MessageDeliveryException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]
    at org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:389)
    at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:476)
    at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:444)
    at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply(MessagingRSocket.java:173)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
    at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstInner.onNext(FluxSwitchOnFirst.java:180)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
    at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:240)
    at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
    at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:427)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
    at io.rsocket.internal.RateLimitableRequestPublisher.subscribe(RateLimitableRequestPublisher.java:74)
    at io.rsocket.RSocketResponder.handleStream(RSocketResponder.java:446)
    at io.rsocket.RSocketResponder.handleChannel(RSocketResponder.java:502)
    at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:315)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
    at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)


java.lang.AssertionError: expectation "expectNext(A)" failed (expected: onNext(A); actual: onError(io.rsocket.exceptions.ApplicationErrorException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]))

    at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
    at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
    at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
    at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
    at reactor.test.DefaultStepVerifierBuilder.lambda$addExpectedValue(DefaultStepVerifierBuilder.java:501)
    at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2211)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1483)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1431)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1091)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:129)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
    at reactor.core.publisher.UnicastProcessor.checkTerminated(UnicastProcessor.java:334)
    at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:232)
    at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
    at reactor.core.publisher.UnicastProcessor.onError(UnicastProcessor.java:401)
    at io.rsocket.RSocketRequester.handleFrame(RSocketRequester.java:556)
    at io.rsocket.RSocketRequester.handleIncomingFrames(RSocketRequester.java:516)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:308)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:422)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: io.rsocket.exceptions.ApplicationErrorException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]
        at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45)
        ... 37 more

我刚刚使用您的所有代码从 https://start.spring.io 创建了一个新项目,它对我来说效果很好。我用Spring开机2.2.0.

尽管我发现 RSocket 自动配置与我们拥有的 ServerRSocketConnector 之间没有关联。

我们需要以某种方式使 ServerRSocketConnector 基于我们在 Spring Boot 中自动配置的内容以支持 RSocket。欢迎在 Spring Boot 中就此事提出问题!然后我们将尝试弄清楚 and/or 在 Spring 集成中必须实施什么以及如何实施。