如何在 spring boot 客户端应用程序中创建不共享底层网络连接的套接字

How do I create rSockets that don't share an underlying network connection in springboot client app

我想创建从 springboot 客户端应用程序到单个服务器的多个 rsocket 连接,而不共享相同的底层网络连接。

此用例是负载测试 - 我想模拟许多 Web 客户端与我的服务器建立 rSocket 连接,并希望从单个应用程序实例创建(比如说)100 个这样的连接,以便更容易操作和监控超过 运行 100 个独立进程。

我正在按照这些思路做一些事情:

RSocketRequester requester = rSocketRequesterBuilder
   .dataMimeType(MediaType.APPLICATION_JSON)
   .setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
   .rsocketStrategies(builder -> builder.encoder(new BearerTokenAuthenticationEncoder()))
   .websocket(serverUri);
requester.route("my.endpoint").retrieveFlux(byte[].class);

这可以很好地与服务器建立多个连接并获取数据。但是 websocket(),根据它的文档,似乎在重复使用相同的底层共享连接,这意味着它没有充分模拟许多独立的连接,给服务器带来负载。

知道怎么做吗?

我正在使用 Springboot 2.4.4。

更新

我现在认为我已成功建立两个连接,但我的第二个连接请求以某种方式继承并附加到第一个连接的元数据,而不是拥有自己单独的元数据。我关注的元数据是 JWT。

这是服务器端的日志记录。在我看来,第二个设置框架有两个 JWT,每个都以 message/x.rsocket.authentication.bearer.v0 开头;第一个 JWT 后跟第二个 JWT。

2021-05-03 17:52:20.501 DEBUG 19737 --- [ctor-http-nio-2] reactor.netty.transport.ServerTransport  : [id:98c2fa05, L:/0:0:0:0:0:0:0:0:7080] Bound new server
2021-05-03 17:52:20.506  INFO 19737 --- [           main] o.s.b.rsocket.netty.NettyRSocketServer   : Netty RSocket started on port(s): 7080
2021-05-03 17:52:20.520  INFO 19737 --- [           main] c.p.pano2.rx.RxServicesApplication       : Started RxServicesApplication in 2.616 seconds (JVM running for 3.152)
2021-05-03 17:52:25.799 DEBUG 19737 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id:380596e7, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] New http connection, requesting read
2021-05-03 17:52:25.800 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.transport.TransportConfig  : [id:380596e7, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Initialized pipeline DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:25.826 DEBUG 19737 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id:380596e7, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Increasing pending responses, now 1
2021-05-03 17:52:25.831 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Added decoder [PongHandler] at the end of the user pipeline, full pipeline: [reactor.left.httpCodec, reactor.left.httpTrafficHandler, PongHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2021-05-03 17:52:25.832 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer     : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Handler is being applied: io.rsocket.transport.netty.server.WebsocketServerTransport$$Lambda58/0x0000000800720c40@cb577eb
2021-05-03 17:52:25.837 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Removed handler: reactor.left.httpTrafficHandler, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:25.837 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Non Removed handler: reactor.left.accessLogHandler, context: null, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:25.837 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Non Removed handler: reactor.left.httpMetricsHandler, context: null, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:25.870 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.channel.FluxReceive        : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
2021-05-03 17:52:25.893 DEBUG 19737 --- [ctor-http-nio-3] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b100000000 Length: 300
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 29 6d 65 73 73 61 67 65 2f 78 2e 72 73 6f 63 6b |)message/x.rsock|
|00000010| 65 74 2e 61 75 74 68 65 6e 74 69 63 61 74 69 6f |et.authenticatio|
|00000020| 6e 2e 62 65 61 72 65 72 2e 76 30 00 00 b0 65 79 |n.bearer.v0...ey|
|00000030| 4a 68 62 47 63 69 4f 69 4a 49 55 7a 49 31 4e 69 |JhbGciOiJIUzI1Ni|
|00000040| 4a 39 2e 65 79 4a 71 64 47 6b 69 4f 69 4a 56 55 |J9.eyJqdGkiOiJVU|
|00000050| 30 56 53 4d 53 49 73 49 6d 6c 68 64 43 49 36 4d |0VSMSIsImlhdCI6M|
|00000060| 54 59 79 4d 44 41 33 4f 44 63 30 4e 53 77 69 63 |TYyMDA3ODc0NSwic|
|00000070| 33 56 69 49 6a 6f 69 56 56 4e 46 55 6a 45 69 4c |3ViIjoiVVNFUjEiL|
|00000080| 43 4a 70 63 33 4d 69 4f 69 4a 6d 59 57 74 6c 49 |CJpc3MiOiJmYWtlI|
|00000090| 47 6c 7a 63 33 56 6c 63 69 49 73 49 6d 56 34 63 |Glzc3VlciIsImV4c|
|000000a0| 43 49 36 4d 54 59 79 4d 44 45 32 4e 54 45 30 4e |CI6MTYyMDE2NTE0N|
|000000b0| 58 30 2e 58 33 6e 79 62 6b 4a 44 51 4f 67 36 7a |X0.X3nybkJDQOg6z|
|000000c0| 53 7a 77 39 68 63 68 4d 54 4b 34 6e 71 48 63 34 |Szw9hchMTK4nqHc4|
|000000d0| 4c 6f 47 36 32 73 7a 73 6e 47 58 54 79 77       |LoG62szsnGXTyw  |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:25.929  INFO 19737 --- [ctor-http-nio-3] c.p.p.security.FakeReactiveJwtDecoder    : Decoding jwt 1040895620 subject USER1
2021-05-03 17:52:25.947 DEBUG 19737 --- [ctor-http-nio-3] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 22 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 05 04 74 65 73 74                      |.....test       |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:25.965  INFO 19737 --- [ctor-http-nio-3] c.p.pano2.rx.web.SystemController        : Test connection from USER1 null
2021-05-03 17:52:28.981 DEBUG 19737 --- [     parallel-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 55 53 45 52 31 20 30          |Hello USER1 0   |
+--------+-------------------------------------------------+----------------+
2021-05-03 17:52:30.736 DEBUG 19737 --- [ctor-http-nio-4] r.n.http.server.HttpServerOperations     : [id:d82b3f3a, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] New http connection, requesting read
2021-05-03 17:52:30.736 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.transport.TransportConfig  : [id:d82b3f3a, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Initialized pipeline DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] r.n.http.server.HttpServerOperations     : [id:d82b3f3a, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Increasing pending responses, now 1
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Added decoder [PongHandler] at the end of the user pipeline, full pipeline: [reactor.left.httpCodec, reactor.left.httpTrafficHandler, PongHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.http.server.HttpServer     : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Handler is being applied: io.rsocket.transport.netty.server.WebsocketServerTransport$$Lambda58/0x0000000800720c40@cb577eb
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Removed handler: reactor.left.httpTrafficHandler, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Non Removed handler: reactor.left.accessLogHandler, context: null, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Non Removed handler: reactor.left.httpMetricsHandler, context: null, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:30.752 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.channel.FluxReceive        : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
2021-05-03 17:52:30.755 DEBUG 19737 --- [ctor-http-nio-4] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b100000000 Length: 522
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 29 6d 65 73 73 61 67 65 2f 78 2e 72 73 6f 63 6b |)message/x.rsock|
|00000010| 65 74 2e 61 75 74 68 65 6e 74 69 63 61 74 69 6f |et.authenticatio|
|00000020| 6e 2e 62 65 61 72 65 72 2e 76 30 00 00 b0 65 79 |n.bearer.v0...ey|
|00000030| 4a 68 62 47 63 69 4f 69 4a 49 55 7a 49 31 4e 69 |JhbGciOiJIUzI1Ni|
|00000040| 4a 39 2e 65 79 4a 71 64 47 6b 69 4f 69 4a 56 55 |J9.eyJqdGkiOiJVU|
|00000050| 30 56 53 4d 53 49 73 49 6d 6c 68 64 43 49 36 4d |0VSMSIsImlhdCI6M|
|00000060| 54 59 79 4d 44 41 33 4f 44 63 30 4e 53 77 69 63 |TYyMDA3ODc0NSwic|
|00000070| 33 56 69 49 6a 6f 69 56 56 4e 46 55 6a 45 69 4c |3ViIjoiVVNFUjEiL|
|00000080| 43 4a 70 63 33 4d 69 4f 69 4a 6d 59 57 74 6c 49 |CJpc3MiOiJmYWtlI|
|00000090| 47 6c 7a 63 33 56 6c 63 69 49 73 49 6d 56 34 63 |Glzc3VlciIsImV4c|
|000000a0| 43 49 36 4d 54 59 79 4d 44 45 32 4e 54 45 30 4e |CI6MTYyMDE2NTE0N|
|000000b0| 58 30 2e 58 33 6e 79 62 6b 4a 44 51 4f 67 36 7a |X0.X3nybkJDQOg6z|
|000000c0| 53 7a 77 39 68 63 68 4d 54 4b 34 6e 71 48 63 34 |Szw9hchMTK4nqHc4|
|000000d0| 4c 6f 47 36 32 73 7a 73 6e 47 58 54 79 77 29 6d |LoG62szsnGXTyw)m|
|000000e0| 65 73 73 61 67 65 2f 78 2e 72 73 6f 63 6b 65 74 |essage/x.rsocket|
|000000f0| 2e 61 75 74 68 65 6e 74 69 63 61 74 69 6f 6e 2e |.authentication.|
|00000100| 62 65 61 72 65 72 2e 76 30 00 00 b0 65 79 4a 68 |bearer.v0...eyJh|
|00000110| 62 47 63 69 4f 69 4a 49 55 7a 49 31 4e 69 4a 39 |bGciOiJIUzI1NiJ9|
|00000120| 2e 65 79 4a 71 64 47 6b 69 4f 69 4a 56 55 30 56 |.eyJqdGkiOiJVU0V|
|00000130| 53 4d 69 49 73 49 6d 6c 68 64 43 49 36 4d 54 59 |SMiIsImlhdCI6MTY|
|00000140| 79 4d 44 41 33 4f 44 63 31 4d 43 77 69 63 33 56 |yMDA3ODc1MCwic3V|
|00000150| 69 49 6a 6f 69 56 56 4e 46 55 6a 49 69 4c 43 4a |iIjoiVVNFUjIiLCJ|
|00000160| 70 63 33 4d 69 4f 69 4a 6d 59 57 74 6c 49 47 6c |pc3MiOiJmYWtlIGl|
|00000170| 7a 63 33 56 6c 63 69 49 73 49 6d 56 34 63 43 49 |zc3VlciIsImV4cCI|
|00000180| 36 4d 54 59 79 4d 44 45 32 4e 54 45 31 4d 48 30 |6MTYyMDE2NTE1MH0|
|00000190| 2e 4a 33 63 63 52 68 72 31 43 4b 70 45 43 58 66 |.J3ccRhr1CKpECXf|
|000001a0| 45 51 53 31 5a 49 63 71 4f 6d 76 57 31 52 52 4d |EQS1ZIcqOmvW1RRM|
|000001b0| 43 38 67 53 76 4a 76 45 54 44 64 67             |C8gSvJvETDdg    |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:30.757  INFO 19737 --- [ctor-http-nio-4] c.p.p.security.FakeReactiveJwtDecoder    : Decoding jwt 1994845380 subject USER1
2021-05-03 17:52:30.757 DEBUG 19737 --- [ctor-http-nio-4] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 22 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 05 04 74 65 73 74                      |.....test       |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:30.758  INFO 19737 --- [ctor-http-nio-4] c.p.pano2.rx.web.SystemController        : Test connection from USER1 null
2021-05-03 17:52:31.967 DEBUG 19737 --- [     parallel-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 55 53 45 52 31 20 31          |Hello USER1 1   |
+--------+-------------------------------------------------+----------------+
2021-05-03 17:52:33.765 DEBUG 19737 --- [     parallel-4] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 55 53 45 52 31 20 30          |Hello USER1 0   |
+--------+-------------------------------------------------+----------------+

我在连接从客户端的窃听日志中看到的内容时遇到问题;帧记录器中的编码似乎不同且人类可读性较差,但看起来第二个请求确实比第一个长。 (省略日志以保持在 30k 字符限制内)

我用来生成两个客户端请求的代码是:

  private void testRequest(String user) {
        String jwt = FakeJwtUtils.createJwt(user, user, 24*60*60*1000L);
        URI uri = config.getUri();
        String baseUrl = uri.getHost() + ":" + uri.getPort();
        HttpClient httpClient = HttpClient.newConnection().baseUrl(baseUrl).wiretap(true);  // newConnection() to avoid shared connection
        WebsocketClientTransport transport = WebsocketClientTransport.create(httpClient, uri.getPath());
        try {
            log.info("Setting up rSocketRequesterBuilder for jwt sub {}: {}", FakeJwtUtils.parseClaimsAsMap(jwt).get("sub"), jwt);
        } catch (Exception ignore) { }
        var requesterBuilder = rSocketRequesterBuilder
                .dataMimeType(MediaType.APPLICATION_JSON)
                .setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
                .rsocketStrategies(builder -> builder.encoder(new BearerTokenAuthenticationEncoder()))
                .transport(transport);
        requesterBuilder.route("test").retrieveFlux(byte[].class).subscribe(bytes -> log.info("rcvd: {}", new String(bytes)));
    }

如有指点,将不胜感激!

我正在回答我自己的问题,以防其他搜索者与我有类似的困惑。

根本问题是我重复使用 RSocketRequester.Builder 为我的多个请求创建多个请求者。这导致了多个问题;最有问题的是我有效地(在上面的代码中)在同一个构建器上多次调用 setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE),结果是附加 JWT 条目而不是像我预期的那样设置或替换。

解决方案似乎只是为每个连接创建一个新的构建器:

RSocketRequester.builder()
     .dataMimeType(MediaType.APPLICATION_JSON)
     .setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
     .rsocketStrategies(b -> b.encoder(new BearerTokenAuthenticationEncoder()))
     .websocket(config.getUri())
     .route("test").retrieveFlux(byte[].class).subscribe();