从网络客户端 RSocketWebSocketClient 连接到 Spring-boot-Rsocket (Auth JWT) 时出错

Error connect to Spring-boot-Rsocket (Auth JWT) from web-client RSocketWebSocketClient

使用 spring-boot 客户端与服务器的连接工作正常:

public RSocketAdapter() throws IOException {
    requester = createRSocketRequesterBuilder()
    .connectWebSocket(URI.create("ws://localhost:7878/"))
    .block();
}
private RSocketRequester.Builder createRSocketRequesterBuilder() {
    RSocketStrategies strategies = RSocketStrategies.builder()
    .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
    .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
    .dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
    .build();
    return RSocketRequester.builder().rsocketStrategies(strategies);
}

public Mono<HelloToken> signIn(String principal, String credential) {
    return requester
            .route("signin.v1")
            .data(HelloUser.builder().userId(principal).password(credential).build())
            .retrieveMono(HelloToken.class)
            .doOnNext(token -> {
                accessToken = token.getAccessToken();
            })
            .onErrorStop();
}

并且服务器收到这样的帧:

Correct byte frame

但是来自网络客户端的相同请求:

authSocketReactiv = () => {    
    const maxRSocketRequestN = 2147483647;    
    const keepAlive = 60000;
    const lifetime = 180000;
    const dataMimeType = 'application/json';
    const metadataMimeType = 'message/x.rsocket.authentication.bearer.v0';
    
    var client = new RSocketClient({
      serializers: {
        data: JsonSerializer,
        metadata: JsonSerializer,
      },
      setup: {
        dataMimeType,
        keepAlive,
        lifetime,
        metadataMimeType
      },
      transport: new RSocketWebSocketClient({
        url: 'ws://localhost:7878'                
      },Encoders)
    });

    // Open the connection
    client.connect().subscribe({
    onComplete: socket => {
      socket.requestStream({
        data:{
          'user_id': '0000',
          'password': 'Zero4'
        },
        metadata:'signin.v1'

      }).subscribe({
        onComplete: () => console.log('complete'),
        onError: error => {
          console.log(error);
        },
        onNext: payload => {
          console.log('Subscribe1');
        },
        onSubscribe: subscription => {
          console.log('Subscribe');
          subscription.request(2147483647);
        },
      });
    },
    onError: error => {
      console.log(error);
    },
    onSubscribe: cancel => {
     
    }
  });

形成不正确的框架并以“元数据格式错误错误”落下:

Error byte frame from web

这里应该使用什么编码或缓冲选项?感谢任何提示和建议。

您可能想要使用复合元数据并将 metadataMimeType 设置为 MESSAGE_RSOCKET_COMPOSITE_METADATA.string

路由元数据很重要,它告诉服务器如何路由传入的 RSocket 请求。

我没有仔细研究您在 Whosebug 上链接的服务器示例代码,但只要查看您的示例代码,您就可以像这样使用 requestStream 提供路由元数据:

此外,您列出的 example project 虽然将 signin 引用为 request/response,因此您实际上不想要 requestStream,而是 requestResponse

socket
  .requestResponse({
    data: Buffer.from(JSON.stringify({
      user_id: '0000',
      password: 'Zero4'
    })),
    metadata: encodeCompositeMetadata([
      [MESSAGE_RSOCKET_ROUTING, encodeRoute("signin.v1")],
    ]),
  })

您可能想要使用 BufferEncoders,如此 example 所示。此外,我认为您不应该对元数据使用 JsonSerializer,而应该使用 IdentitySerializer,这将直接传递复合元数据缓冲区,而不是尝试序列化到 JSON 或从 JSON 序列化。

您可能仍然 运行 遇到一些问题,但我怀疑这会让您克服 metadata is malformed ERROR 错误。

希望对您有所帮助。

非常感谢您的详细建议。根据指示,这个 complined 解决方案适用于我的情况:

  getAuthToken = () => {    

    const maxRSocketRequestN = 2147483647;    
    const keepAlive = 60000;
    const lifetime = 180000;
    const dataMimeType = APPLICATION_JSON.string;
    const metadataMimeType = MESSAGE_RSOCKET_COMPOSITE_METADATA.string;

    var client = new RSocketClient({
      serializers: {
        data: IdentitySerializer,
        metadata: IdentitySerializer,
      },
      setup: {
        dataMimeType,
        keepAlive,
        lifetime,
        metadataMimeType
      },
      transport: new RSocketWebSocketClient({
        url: 'ws://localhost:7878'                
      },BufferEncoders)
    });
    
    client.connect().then(
      (socket) => {
        socket.requestResponse({

          data: Buffer.from(JSON.stringify({
            user_id: '0000',
            password: 'Zero4'
          })),
          metadata: encodeCompositeMetadata([
            [MESSAGE_RSOCKET_ROUTING, encodeRoute("signin.v1")],
          ]),

        }).subscribe({ 
         onComplete: (data) => console.log(data),
         onError: error =>
            console.error(`Request-stream error:${error.message}`),
        });
      },
      
      (error) => {
        console.log("composite initial connection failed");
      }
    );