RSocket 和 Spring 不处理多个请求
RSocket and Spring not handle multiple requests
我和 RSocket 一起玩 Spring 引导。我想做一个简单的请求-响应示例。作为示例,我从 link:
中获取了代码
https://www.baeldung.com/spring-boot-rsocket#request-response
源代码:
当我 运行 没有更改的示例代码时,我在请求期间收到异常错误。这个错误不是这个问题的重点,但我只想显示 baeldung 对原始来源的更改。
[reactor-tcp-nio-1]
org.springframework.core.log.CompositeLog: [5927a44d-9] 500 Server
Error for HTTP GET "/current/pko"
io.rsocket.exceptions.ApplicationErrorException: No handler for
destination '' at
io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) Suppressed:
reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has
been observed at the following site(s): |_ checkpoint ⇢ Handler
com.baeldung.spring.rsocket.client.MarketDataRestController#current(String)
[DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/current/pko"
[ExceptionHandlingWebHandler] Stack trace: at
io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) at
io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:706)
at
io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:640)
at
reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at
reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at
reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at
reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at
reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670)
at
reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
at
reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112)
at
reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at
reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at
reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260)
at
reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:366)
at
reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358)
at
reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at
io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at
io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
所以我更改了客户端代码
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
到
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
}
这个小改动有帮助,不会出现异常。该问题的另一个问题是来自客户端(请求者)的请求由服务器(响应者)一个一个地处理。我在 2 个线程中创建 SOAPUI REST 项目和 运行 GET 请求。看起来服务器使用单线程。这不是我期望的结果。
为了简单起见,我将展示整个解决方案。
服务器:
简单控制器
@Controller
public class MarketDataRSocketController {
Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
logger.info("Getting data for: "+marketDataRequest);
Mono<MarketData> result = marketDataRepository.getOne(marketDataRequest.getStock());
logger.info("Controller thread move forward: "+marketDataRequest);
return result;
}
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}
}
在存储库中我添加 Thread.sleep(10000);
只是为了模拟长时间的 运行ning 操作。
@Component
public class MarketDataRepository {
Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
private static final int BOUND = 100;
private Random random = new Random();
public Mono<MarketData> getOne(String stock) {
//return return Mono.just(getMarketDataResponse(stock)); original code from baeldung.
return Mono.just(stock).map(s -> getMarketDataResponse(s));
}
private MarketData getMarketDataResponse(String stock) {
logger.info("Repository thread go speel ZzzZZ");
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
logger.info("Repository thread move forward");
return new MarketData(stock, random.nextInt(BOUND));
}
}
客户
简单的客户端配置:
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
}
以及我在 SOAP 中使用的简单 REST 控制器 UI
@RestController
public class MarketDataRestController {
Logger logger = LoggerFactory.getLogger(MarketDataRestController.class);
private final Random random = new Random();
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
logger.info("Get REST call for stock : "+stock);
return rSocketRequester.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
}
当我 运行 服务器和客户端时,我会遇到难以理解的行为。通过 SOAP UI 我在 2 个线程中发出单个请求。
在客户端日志中我得到:
2021-09-01 11:30:14,614 INFO [reactor-http-nio-2] com.baeldung.spring.rsocket.client.MarketDataRestController: Get REST call for stock : pko
2021-09-01 11:30:14,691 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.client.MarketDataRestController: Get REST call for stock : pko
在服务器中我得到如下日志:
第一次拍摄的记录:
// get data from client
2021-09-01 11:30:14,843 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Getting data for: MarketDataRequest(stock=pko)
// Log that Contoller thread go forward after call repository
2021-09-01 11:30:14,844 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Controller thread move forward: MarketDataRequest(stock=pko)
// Log that repository sleep thread
2021-09-01 11:30:14,862 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread go speel ZzzZZ
// Repository finish work
2021-09-01 11:30:24,863 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread move forward
服务器仅处理单个调用,等待存储库完成作业。然后以类似的方式处理下一个请求:
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Getting data for: MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Controller thread move forward: MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread go speel ZzzZZ
2021-09-01 11:30:34,876 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread move forward
我不明白为什么服务器要一个一个处理调用。也许代码中存在一些问题,或者我可能不理解正确的内容。
提前谢谢你。
在 Reactor 中,默认情况下,一切都在主线程上 运行。调用 Thread.sleep
主线程阻塞,应用程序冻结。如果您想模拟一个 long-运行 操作,您可以使用 delayElements 运算符:
.delayElements(Duration.ofSeconds(10));
注意:Reactor BlockHound 检测并报告此类阻塞调用。
我和 RSocket 一起玩 Spring 引导。我想做一个简单的请求-响应示例。作为示例,我从 link:
中获取了代码https://www.baeldung.com/spring-boot-rsocket#request-response
源代码:
当我 运行 没有更改的示例代码时,我在请求期间收到异常错误。这个错误不是这个问题的重点,但我只想显示 baeldung 对原始来源的更改。
[reactor-tcp-nio-1] org.springframework.core.log.CompositeLog: [5927a44d-9] 500 Server Error for HTTP GET "/current/pko" io.rsocket.exceptions.ApplicationErrorException: No handler for destination '' at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ Handler com.baeldung.spring.rsocket.client.MarketDataRestController#current(String) [DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/current/pko" [ExceptionHandlingWebHandler] Stack trace: at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:706) at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:640) at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670) at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205) at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:366) at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834)
所以我更改了客户端代码
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
到
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
}
这个小改动有帮助,不会出现异常。该问题的另一个问题是来自客户端(请求者)的请求由服务器(响应者)一个一个地处理。我在 2 个线程中创建 SOAPUI REST 项目和 运行 GET 请求。看起来服务器使用单线程。这不是我期望的结果。
为了简单起见,我将展示整个解决方案。
服务器:
简单控制器
@Controller
public class MarketDataRSocketController {
Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
logger.info("Getting data for: "+marketDataRequest);
Mono<MarketData> result = marketDataRepository.getOne(marketDataRequest.getStock());
logger.info("Controller thread move forward: "+marketDataRequest);
return result;
}
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}
}
在存储库中我添加 Thread.sleep(10000);
只是为了模拟长时间的 运行ning 操作。
@Component
public class MarketDataRepository {
Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
private static final int BOUND = 100;
private Random random = new Random();
public Mono<MarketData> getOne(String stock) {
//return return Mono.just(getMarketDataResponse(stock)); original code from baeldung.
return Mono.just(stock).map(s -> getMarketDataResponse(s));
}
private MarketData getMarketDataResponse(String stock) {
logger.info("Repository thread go speel ZzzZZ");
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
logger.info("Repository thread move forward");
return new MarketData(stock, random.nextInt(BOUND));
}
}
客户
简单的客户端配置:
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
}
以及我在 SOAP 中使用的简单 REST 控制器 UI
@RestController
public class MarketDataRestController {
Logger logger = LoggerFactory.getLogger(MarketDataRestController.class);
private final Random random = new Random();
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
logger.info("Get REST call for stock : "+stock);
return rSocketRequester.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
}
当我 运行 服务器和客户端时,我会遇到难以理解的行为。通过 SOAP UI 我在 2 个线程中发出单个请求。
在客户端日志中我得到:
2021-09-01 11:30:14,614 INFO [reactor-http-nio-2] com.baeldung.spring.rsocket.client.MarketDataRestController: Get REST call for stock : pko
2021-09-01 11:30:14,691 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.client.MarketDataRestController: Get REST call for stock : pko
在服务器中我得到如下日志:
第一次拍摄的记录:
// get data from client
2021-09-01 11:30:14,843 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Getting data for: MarketDataRequest(stock=pko)
// Log that Contoller thread go forward after call repository
2021-09-01 11:30:14,844 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Controller thread move forward: MarketDataRequest(stock=pko)
// Log that repository sleep thread
2021-09-01 11:30:14,862 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread go speel ZzzZZ
// Repository finish work
2021-09-01 11:30:24,863 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread move forward
服务器仅处理单个调用,等待存储库完成作业。然后以类似的方式处理下一个请求:
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Getting data for: MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Controller thread move forward: MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread go speel ZzzZZ
2021-09-01 11:30:34,876 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread move forward
我不明白为什么服务器要一个一个处理调用。也许代码中存在一些问题,或者我可能不理解正确的内容。 提前谢谢你。
在 Reactor 中,默认情况下,一切都在主线程上 运行。调用 Thread.sleep
主线程阻塞,应用程序冻结。如果您想模拟一个 long-运行 操作,您可以使用 delayElements 运算符:
.delayElements(Duration.ofSeconds(10));
注意:Reactor BlockHound 检测并报告此类阻塞调用。