Rsocket 服务器异常:没有目标 '' 的处理程序(目标不从客户端传递到服务器)
Rsocket Server exception: No handler for destination '' (destination does not pass from client to server)
我为 RSocket 消息写了一个小演示
问题是我无法访问 Rsocket
端点,
我从服务器收到以下异常:
客户端:
配置:
@Bean
RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(new InetSocketAddress(7500)))
.start()
.block();
}
@Bean
RSocketRequester requester(RSocketStrategies strategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, strategies);
}
控制器:
private final RSocketRequester requester;
@GetMapping("/greet/{name}")
public Publisher<GreetingsResponse> greet(@PathVariable String name) {
return requester
.route("hello")
.data(new GreetingsRequest(name))
.retrieveMono(GreetingsResponse.class);
}
服务器端(使用spring Rsocket):
yml:
spring:
rsocket:
server:
port: 7500
transport: tcp
main:
lazy-initialization: true
配置:
@MessageMapping("hello")
Mono<GreetingsResponse> greet(GreetingsRequest request) {
return Mono.just(new GreetingsResponse("Hello " + request.getName() + " @ " + Instant.now()));
}
我很确定它与新的 wrap
函数有关,RSocketRequester.wrap
因为它接受一个新参数 metadataMimeType
,我将它设置为 application/Json,
但它似乎不起作用
堆栈跟踪:
org.springframework.messaging.MessageDeliveryException: No handler for
destination '' at
org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:312)
at
org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:445)
at
org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:417)
at
org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply(MessagingRSocket.java:173)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) at
reactor.core.publisher.Mono.subscribe(Mono.java:3920) at
reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
at
reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at
reactor.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:74)
at
io.rsocket.RSocketResponder.handleRequestResponse(RSocketResponder.java:386)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:298)
at
reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at
reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238)
at
reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at
reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at
reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
at reactor.core.publisher.Flux.subscribe(Flux.java:8000) at
reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184)
at
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1582)
at
reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:316)
at
io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116)
at
reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at
reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
at
reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
at
reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
at
reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at
reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at
reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
at
reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:322)
at
reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:342)
at
reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) at
io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:918)
at
io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
您使用的是哪个 spring 版本?
我遇到了同样的问题,我通过更改 spring-boot-starter-parent 2.2.0.M3.
解决了它
这是我的来源
https://github.com/han1448/spring-rsocket-example
已添加。
我解决了这个问题。您需要将 mimeType 更改为 message/x.rsocket.routing.v0
。
您可以从 MetadataExtractor.ROUTING
.
获取此 mimeType
@Bean
RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(new InetSocketAddress(7500)))
.start()
.block();
}
@Bean
RSocketRequester requester(RSocketStrategies strategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MetadataExtractor.ROUTING, strategies);
}
对于RSocketRequester
,方法名从create到wrap,然后添加第三个参数。所以,我觉得现在还不稳定。
我的选择是使用builder方法,如下图
@Bean
RSocketRequester rSocketRequester(RSocketStrategies strategies) {
InetSocketAddress address = new InetSocketAddress(clientConfigProp.getHost(), clientConfigProp.getPort());
log.info("RSocket server address={}", address);
return RSocketRequester.builder()
.rsocketFactory(factory -> factory
.dataMimeType(MimeTypeUtils.ALL_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY))
.rsocketStrategies(strategies)
.connect(TcpClientTransport.create(address))
.retry().block();
}
我为 RSocket 消息写了一个小演示
问题是我无法访问 Rsocket
端点,
我从服务器收到以下异常:
客户端: 配置:
@Bean
RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(new InetSocketAddress(7500)))
.start()
.block();
}
@Bean
RSocketRequester requester(RSocketStrategies strategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, strategies);
}
控制器:
private final RSocketRequester requester;
@GetMapping("/greet/{name}")
public Publisher<GreetingsResponse> greet(@PathVariable String name) {
return requester
.route("hello")
.data(new GreetingsRequest(name))
.retrieveMono(GreetingsResponse.class);
}
服务器端(使用spring Rsocket): yml:
spring:
rsocket:
server:
port: 7500
transport: tcp
main:
lazy-initialization: true
配置:
@MessageMapping("hello")
Mono<GreetingsResponse> greet(GreetingsRequest request) {
return Mono.just(new GreetingsResponse("Hello " + request.getName() + " @ " + Instant.now()));
}
我很确定它与新的 wrap
函数有关,RSocketRequester.wrap
因为它接受一个新参数 metadataMimeType
,我将它设置为 application/Json,
但它似乎不起作用
堆栈跟踪:
org.springframework.messaging.MessageDeliveryException: No handler for destination '' at org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:312) at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:445) at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:417) at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply(MessagingRSocket.java:173) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) at reactor.core.publisher.Mono.subscribe(Mono.java:3920) at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207) at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) at reactor.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:74) at io.rsocket.RSocketResponder.handleRequestResponse(RSocketResponder.java:386) at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:298) at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) at reactor.core.publisher.Flux.subscribe(Flux.java:8000) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1582) at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:316) at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116) at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206) at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:322) at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:342) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834)
您使用的是哪个 spring 版本? 我遇到了同样的问题,我通过更改 spring-boot-starter-parent 2.2.0.M3.
解决了它这是我的来源 https://github.com/han1448/spring-rsocket-example
已添加。
我解决了这个问题。您需要将 mimeType 更改为 message/x.rsocket.routing.v0
。
您可以从 MetadataExtractor.ROUTING
.
@Bean
RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(new InetSocketAddress(7500)))
.start()
.block();
}
@Bean
RSocketRequester requester(RSocketStrategies strategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MetadataExtractor.ROUTING, strategies);
}
对于RSocketRequester
,方法名从create到wrap,然后添加第三个参数。所以,我觉得现在还不稳定。
我的选择是使用builder方法,如下图
@Bean
RSocketRequester rSocketRequester(RSocketStrategies strategies) {
InetSocketAddress address = new InetSocketAddress(clientConfigProp.getHost(), clientConfigProp.getPort());
log.info("RSocket server address={}", address);
return RSocketRequester.builder()
.rsocketFactory(factory -> factory
.dataMimeType(MimeTypeUtils.ALL_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY))
.rsocketStrategies(strategies)
.connect(TcpClientTransport.create(address))
.retry().block();
}