RSocket 和 Spring 不处理多个请求

RSocket and Spring not handle multiple requests

我和 RSocket 一起玩 Spring 引导。我想做一个简单的请求-响应示例。作为示例,我从 link:

中获取了代码

https://www.baeldung.com/spring-boot-rsocket#request-response

源代码:

https://github.com/eugenp/tutorials/tree/master/spring-5-webflux/src/main/java/com/baeldung/spring/rsocket

当我 运行 没有更改的示例代码时,我在请求期间收到异常错误。这个错误不是这个问题的重点,但我只想显示 baeldung 对原始来源的更改。

[reactor-tcp-nio-1] org.springframework.core.log.CompositeLog: [5927a44d-9] 500 Server Error for HTTP GET "/current/pko" io.rsocket.exceptions.ApplicationErrorException: No handler for destination '' at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ Handler com.baeldung.spring.rsocket.client.MarketDataRestController#current(String) [DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/current/pko" [ExceptionHandlingWebHandler] Stack trace: at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:706) at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:640) at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670) at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205) at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:366) at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834)

所以我更改了客户端代码

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        return RSocketFactory.connect()
                             .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
                             .frameDecoder(PayloadDecoder.ZERO_COPY)
                             .transport(TcpClientTransport.create(7000))
                             .start()
                             .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}

@Configuration
public class ClientConfiguration {
    

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketStrategies(rSocketStrategies)
                .connectTcp("localhost", 7000)
                .block();
    }
}

这个小改动有帮助,不会出现异常。该问题的另一个问题是来自客户端(请求者)的请求由服务器(响应者)一个一个地处理。我在 2 个线程中创建 SOAPUI REST 项目和 运行 GET 请求。看起来服务器使用单线程。这不是我期望的结果。

为了简单起见,我将展示整个解决方案。

服务器:

简单控制器

@Controller
public class MarketDataRSocketController {

    Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);

    private final MarketDataRepository marketDataRepository;

    public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
        this.marketDataRepository = marketDataRepository;
    }

    @MessageMapping("currentMarketData")
    public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
        logger.info("Getting data for: "+marketDataRequest);
        Mono<MarketData> result = marketDataRepository.getOne(marketDataRequest.getStock());
        logger.info("Controller thread move forward: "+marketDataRequest);
        return result;
    }

    @MessageExceptionHandler
    public Mono<MarketData> handleException(Exception e) {
        return Mono.just(MarketData.fromException(e));
    }
}

在存储库中我添加 Thread.sleep(10000); 只是为了模拟长时间的 运行ning 操作。

@Component
public class MarketDataRepository {
    Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
    private static final int BOUND = 100;
    private Random random = new Random();

    public Mono<MarketData> getOne(String stock) {
        //return return Mono.just(getMarketDataResponse(stock)); original code from baeldung.
        return Mono.just(stock).map(s -> getMarketDataResponse(s));
    }

    private MarketData getMarketDataResponse(String stock) {
        logger.info("Repository thread go speel ZzzZZ");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        logger.info("Repository thread move forward");
        return new MarketData(stock, random.nextInt(BOUND));
    }
}

客户

简单的客户端配置:

@Configuration
public class ClientConfiguration {


    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketStrategies(rSocketStrategies)
                .connectTcp("localhost", 7000)
                .block();
    }
}

以及我在 SOAP 中使用的简单 REST 控制器 UI

@RestController
public class MarketDataRestController {
    Logger logger = LoggerFactory.getLogger(MarketDataRestController.class);

    private final Random random = new Random();
    private final RSocketRequester rSocketRequester;

    public MarketDataRestController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping(value = "/current/{stock}")
    public Publisher<MarketData> current(@PathVariable("stock") String stock) {
        logger.info("Get REST call for stock : "+stock);
        return rSocketRequester.route("currentMarketData")
                               .data(new MarketDataRequest(stock))
                               .retrieveMono(MarketData.class);
    }
}

当我 运行 服务器和客户端时,我会遇到难以理解的行为。通过 SOAP UI 我在 2 个线程中发出单个请求。

在客户端日志中我得到:

2021-09-01 11:30:14,614 INFO [reactor-http-nio-2] com.baeldung.spring.rsocket.client.MarketDataRestController: Get REST call for stock : pko
2021-09-01 11:30:14,691 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.client.MarketDataRestController: Get REST call for stock : pko

在服务器中我得到如下日志:

第一次拍摄的记录:

// get data from client
2021-09-01 11:30:14,843 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Getting data for: MarketDataRequest(stock=pko)
// Log that Contoller thread go forward after call repository
2021-09-01 11:30:14,844 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Controller thread move forward: MarketDataRequest(stock=pko)
// Log that repository sleep thread
2021-09-01 11:30:14,862 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread go speel ZzzZZ
// Repository finish work
2021-09-01 11:30:24,863 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread move forward

服务器仅处理单个调用,等待存储库完成作业。然后以类似的方式处理下一个请求:

2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Getting data for: MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Controller thread move forward: MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread go speel ZzzZZ
2021-09-01 11:30:34,876 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread move forward

我不明白为什么服务器要一个一个处理调用。也许代码中存在一些问题,或者我可能不理解正确的内容。 提前谢谢你。

在 Reactor 中,默认情况下,一切都在主线程上 运行。调用 Thread.sleep 主线程阻塞,应用程序冻结。如果您想模拟一个 long-运行 操作,您可以使用 delayElements 运算符:

.delayElements(Duration.ofSeconds(10));

注意:Reactor BlockHound 检测并报告此类阻塞调用。