如何使用 RSocket 处理从服务器发送到客户端的消息?

How to handle message sent from server to client with RSocket?

我尝试使用RSocketRequester从服务器向特定客户端发送消息,但我不知道如何在前端处理它。服务器是 Spring Webflux,控制器是这样的:

data class Message(val message: String)

@Controller
class RSocketController {

    private val log = LoggerFactory.getLogger(RSocketController::class.java)

    @MessageMapping("say.hello")
    fun sayHello(message: String): Flux<Message> {
        log.info("say hello {}", message)
        return Flux.just(Message("server says hello"))
    }

    @MessageMapping("say.hi")
    fun sayHi(message: String, rSocketRequester: RSocketRequester): Flux<Message> {
        log.info("say hi {}", message)
        rSocketRequester
                .route("say.hello")
                .data(Message("server says hi hello ;)"))
                .send()
                .subscribe()
        return Flux.just(Message("server says hi!!"))
    }

}

在前端我使用 rsocket-jssayHello 方法工作得很好(请求流),但是当我调用 sayHi 方法时,我想从服务器发送两条消息。第一个到 say.hello 端点,第二个到 say.hi 端点。我有这样的 rsocket-js 实现:

    sayHello() {
      console.log("say hello");
      this.requestStream("say.hello");
    },
    sayHi() {
      console.log("say hi");
      this.requestStream("say.hi");
    },
    connect() {
      const transport = new RSocketWebSocketClient({
        url: "ws://localhost:8080/rsocket"
      });
      const client = new RSocketClient({        
        serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
        },
        setup: {                  
          keepAlive: 60000,          
          lifetime: 180000,
          dataMimeType: "application/json",
          metadataMimeType: "message/x.rsocket.routing.v0"
        },
        transport
      });
      client.connect().subscribe({
        onComplete: socket => {
          this.socket = socket;
          console.log("complete connection");          
        },
        onError: error => {
          console.log("got connection error");
          console.error(error);
        },
        onSubscribe: cancel => {
          console.log("subscribe connection");
          console.log(cancel);          
        }        
      });
    },    
    requestStream(url) {
      if (this.socket) {
        this.socket
          .requestStream({
            data: url + " from client",
            metadata: String.fromCharCode(url.length) + url
          })
          .subscribe({
            onComplete: () => console.log("requestStream done"),
            onError: error => {
              console.log("got error with requestStream");
              console.error(error);
            },
            onNext: value => {
              // console.log("got next value in requestStream..");
              console.log("got data from sever");
              console.log(value.data);
            },
            // Nothing happens until `request(n)` is called
            onSubscribe: sub => {
              console.log("subscribe request Stream!");
              sub.request(2147483647);
              // sub.request(3);
            }
          });
      } else {
        console.log("not connected...");
      }
    }

我可以在 Google Chrome DevTools -> Network -> rsocket 中看到这两条消息。所以客户端收到了它们,但我无法在代码中捕捉到 RSocketRequester 发送的代码。 似乎服务器使用 fireAndForget 方法。客户端如何处理?

正如@VladMamaev 所说,我们可以像本例中那样向客户端提供响应程序 https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-examples/src/LeaseClientExample.js#L104

对我来说,fireAndForget方法就够了。

export class EchoResponder {
  constructor(callback) {
    this.callback = callback;
  }
  fireAndForget(payload) {  
    this.callback(payload);
  }  
}

import { EchoResponder } from "~/assets/EchoResponder";
...

  const messageReceiver = payload => {
    //do what you want to do with received message  
    console.log(payload)    
  };
  const responder = new EchoResponder(messageReceiver);


  connect() {
  const transport = new RSocketWebSocketClient({
    url: "ws://localhost:8080/rsocket"
  });
  const client = new RSocketClient({        
    serializers: {
      data: JsonSerializer,
      metadata: IdentitySerializer
    },
    setup: {                  
      keepAlive: 60000,          
      lifetime: 180000,
      dataMimeType: "application/json",
      metadataMimeType: "message/x.rsocket.routing.v0"
    },
    responder: responder,
    transport
  });