来自一个客户端的并行请求在 RSocket 中串行处理

Parallel requests from one client processed in series in RSocket

我希望服务器的所有调用都将并行处理,但事实并非如此。 这是一个简单的例子。

RSocket 版本:1.1.0

服务器

public class ServerApp {
    private static final Logger log = LoggerFactory.getLogger(ServerApp.class);

    public static void main(String[] args) throws InterruptedException {
        RSocketServer.create(SocketAcceptor.forRequestResponse(payload ->
                Mono.fromCallable(() -> {
                    log.debug("Start of my business logic");
                    sleepSeconds(5);
                    return DefaultPayload.create("OK");
                })))
                .bind(WebsocketServerTransport.create(15000))
                .block();
        log.debug("Server started");
        TimeUnit.MINUTES.sleep(30);
    }

    private static void sleepSeconds(int sec) {
        try {
            TimeUnit.SECONDS.sleep(sec);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

客户端

public class ClientApp {
    private static final Logger log = LoggerFactory.getLogger(ClientApp.class);

    public static void main(String[] args) throws InterruptedException {
        RSocket client = RSocketConnector.create()
                .connect(WebsocketClientTransport.create(15000))
                .block();

        long start1 = System.currentTimeMillis();
        client.requestResponse(DefaultPayload.create("Request 1"))
                .doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start1))
                .subscribe();

        long start2 = System.currentTimeMillis();
        client.requestResponse(DefaultPayload.create("Request 2"))
                .doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start2))
                .subscribe();

        TimeUnit.SECONDS.sleep(20);
    }
}

在客户端日志中,我们可以看到两个请求同时发送,并且在 10 秒后同时收到两个响应(每个请求在 5 秒内处理)。

在服务器日志中,我们可以看到请求是按顺序执行的,而不是并行执行的。

你能帮我理解这种行为吗?

  1. 为什么我们在 10 秒而不是 5 秒后收到第一个响应?
  2. 如果我希望并行处理所有请求,如何正确创建服务器?

如果我将 Mono.fromCallable 替换为 Mono.fromFuture(CompletableFuture.supplyAsync(() -> myBusinessLogic(), executorService)),那么它将解析 1.

如果我将 Mono.fromCallable 替换为 Mono.delay(Duration.ZERO).map(ignore -> myBusinessLogic(),那么它将解析 1. 和 2.

如果我用 Mono.create(sink -> sink.success(myBusinessLogic())) 替换 Mono.fromCallable,那么它不会解决我的问题。

客户端日志:

2021-07-16 10:39:46,880 DEBUG [reactor-tcp-nio-1] [/] - sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:

2021-07-16 10:39:46,952 DEBUG [main] [/] - sending -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31                      |Request 1       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:46,957 DEBUG [main] [/] - sending -> 
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32                      |Request 2       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,043 DEBUG [reactor-tcp-nio-1] [/] - receiving -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10120ms
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - receiving -> 
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10094ms

服务器日志:

2021-07-16 10:39:46,965 DEBUG [reactor-http-nio-2] [/] - receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:

2021-07-16 10:39:47,021 DEBUG [reactor-http-nio-2] [/] - receiving -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31                      |Request 1       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:47,027 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:52,037 DEBUG [reactor-http-nio-2] [/] - sending -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - receiving -> 
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32                      |Request 2       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:57,039 DEBUG [reactor-http-nio-2] [/] - sending -> 
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+

您不应将异步代码(如 Reactive Mono 操作)与阻塞代码(如

)混合使用
    private static void sleepSeconds(int sec) {
        try {
            TimeUnit.SECONDS.sleep(sec);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

我怀疑这里的核心问题是像 rsocket-java 这样的框架不想 运行 新线程上的所有内容,代价是过多的上下文切换。所以一般靠你运行长运行宁CPU或者适当的IO操作

你应该看看各种异步延迟操作 https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#delayElement-java.time.Duration-

如果您的延迟是为了模拟长时间的 运行ning 操作,那么您应该考虑订阅不同的调度程序,例如 https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#boundedElastic--