如何响应请求通过 Spring 响应式 websocket 推送数据?

How to push data over reactive websocket with Spring in response to request?

我正在使用 Spring Boot 2.1.3 开始使用反应式 websockets。我创建了一个 WebSocketHandler 实现,如下所示:

@Override
public Mono<Void> handle(WebSocketSession session) {

Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
    var publisher = flux.map( o -> {
        try {
            return objectMapper.writeValueAsString(o);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }).map(session::textMessage)
      .delayElements(Duration.ofSeconds(1));
    return session.send(publisher);
}

这有效,如果我连接,我在我的 websocket 客户端中每秒得到序列化 EfficiencyData

但是,我想对来自 websocket 的请求做出反应,告诉 service 我想要数据的 ID。我设法得到这样的请求信息:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

        return session.textMessage("Subscribing with id " + id);
    }));

现在我不知道如何结合这两个实现?

我希望做这样的事情:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

        Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
        var publisher = flux.map( o -> {
            try {
                return objectMapper.writeValueAsString(o);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return null;
            }
        }).map(session::textMessage)
                            .delayElements(Duration.ofSeconds(1));
        return publisher; //Does not compile
    }));

但这不会编译,因为 publisher 是一个 Flux<WebSocketMessage>,它应该是一个 Publisher<WebSocketMessage>。应该如何处理?

编辑:

根据 WebSocketHandler 的 Javadoc 示例,我尝试了这个:

@Override
public Mono<Void> handle(WebSocketSession session) {
    Flux<EfficiencyData> flux =
            session.receive()
                   .map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
                   .concatMap(service::subscribeToEfficiencyData);
    Mono<Void> input = flux.then();
    Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
    return Mono.zip(input, output).then();
}

但这只会立即断开 websocket 客户端的连接,而无需执行任何操作。

使用 flatMapconcatMap 以扁平化 returned publisher

要解决您的问题,您必须使用允许平整 returned 值的运算符。例如

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(
       session.receive()
              .flatMap(webSocketMessage -> {
                  int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

                  Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
                  var publisher = flux
                      .<String>handle((o, sink) -> {
                         try {
                            sink.next(objectMapper.writeValueAsString(o));
                         } catch (JsonProcessingException e) {
                            e.printStackTrace();
                            return; // null is prohibited in reactive-streams
                         }
                      })
                      .map(session::textMessage)
                      .delayElements(Duration.ofSeconds(1));

                  return publisher;
              })
    );
}

要点

  1. 如果return类型是流,使用flatMapconcatMap(见区别here
  2. 从不 returns Null。在反应流中 Null 是禁止值(参见规范规则 here
  3. 当映射可以以 null 结束时 -> 使用 handle 运算符。查看更多解释