如何使用 Spring Reactive WebSocket 并将其转化为 Flux 流?

How to use Spring Reactive WebSocket and transform it into the Flux stream?

Spring documentation 上有一些 WebSocketClient 示例:

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {...}).blockMillis(5000);

我不确定如何处理传入数据流? 在那个街区 {...}.

我的意思是:如何过滤传入的数据并将其转换为 Flux?

这是我想要得到的。

@GetMapping("/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<MyRecourse> getStreaming() {

    //  get some data from WebSocket (CoinCap service).
    //  Transform that data into MyRecourse object
    //  Return stream to a client 

}

看看 WebSocketHandler.handle() lambda 的 WebSocketSession 参数:

/**
 * Get the flux of incoming messages.
 */
Flux<WebSocketMessage> receive();

有关详细信息,请参阅 Spring WebFlux Workshop

更新

让我们试试这个!

    Mono<Void> sessionMono =
            client.execute(new URI("ws://localhost:8080/echo"),
                    session ->
                            Mono.empty()
                                    .subscriberContext(Context.of(WebSocketSession.class, session))
                                    .then());

    return sessionMono
            .thenMany(
                    Mono.subscriberContext()
                            .flatMapMany(c -> c
                                    .get(WebSocketSession.class)
                                    .receive()))
            .map(WebSocketMessage::getPayloadAsText);

更新 2

或另一个选项,但订阅被阻止:

    EmitterProcessor<String> output = EmitterProcessor.create();

    client.execute(new URI("ws://localhost:8080/echo"),
            session ->
                    session.receive()
                            .map(WebSocketMessage::getPayloadAsText)
                            .subscribeWith(output)
                            .then())
            .block(Duration.ofMillis(5000));

    return output;

更新 3

关于此事的工作 Spring 引导应用程序:https://github.com/artembilan/webflux-websocket-demo

主要代码如下:

    EmitterProcessor<String> output = EmitterProcessor.create();

    Mono<Void> sessionMono =
            client.execute(new URI("ws://localhost:8080/echo"),
                    session -> session.receive()
                            .map(WebSocketMessage::getPayloadAsText)
                            .subscribeWith(output)
                            .then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());