Websocket 通信与 Netty 环境

Websocket communication versus Netty Environment

我需要实现两个 Java 环境之间的通信。接收者是一个 SpringBoot 响应式应用程序,处理通信的代码片段如下(我将跳过 bean 的配置)

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(webSocketSession.receive() // <- Step 0
        .map(message -> {
            log.info("Step 1");
            return message.getPayloadAsText();
        })
        .map(message -> {
            log.info("Step 2");
            return webSocketSession.textMessage(this.receiveMessage(message));
        }));
}

客户端部分是使用 java 11

中的 Http API 实现的
WebSocket webSocket = HttpClient
    .newBuilder().executor(executor).build()
    .newWebSocketBuilder()
    .buildAsync(URI.create(url), new WebSocket.Listener() {
        @Override
        public void onOpen(WebSocket webSocket) {
            log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
            WebSocket.Listener.super.onOpen(webSocket);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            log.info("onText received with data " + data);
            return WebSocket.Listener.super.onText(webSocket, data, last);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            log.info("Closed with status " + statusCode + ", reason: " + reason);
            return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            log.error("Error: " + error.getMessage());
            WebSocket.Listener.super.onError(webSocket, error);
        }

    }).join();


webSocket.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true);
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok").thenRun(() -> log.info("Sent close"));

使用调试,我可以注意到一旦 join() 完成并返回 WebSocket 实例,接收器步骤 0 的方法将被执行,并且 Mono<Void> 实例是返回。

但问题是,即使我发送了一些文本,也永远不会执行步骤 1 和 2!

如果我尝试反向通信(将某些内容从 SpringBoot 应用程序发送到 Sender 应用程序),则会收到消息。

最后,这是 sendClose 语句后 onClose 回调执行的日志。

Closed with status 1002, reason: Server internal error

解决方案

由于 buildAsync 方法 return 是 CompletableFuture<WebSocket> 的实例,我们需要在使用 join()[=14= 刷新消息队列之前链式发送消息]

这里是解决方案

WebSocket webSocket = HttpClient
    .newBuilder().executor(executor).build()
    .newWebSocketBuilder()
    .buildAsync(URI.create(url), new WebSocket.Listener() {
        @Override
        public void onOpen(WebSocket webSocket) {
            log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
            WebSocket.Listener.super.onOpen(webSocket);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            log.info("onText received with data " + data);
            return WebSocket.Listener.super.onText(webSocket, data, last);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            log.info("Closed with status " + statusCode + ", reason: " + reason);
            return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            log.error("Error: " + error.getMessage());
            WebSocket.Listener.super.onError(webSocket, error);
        }

    })
    .thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), false))
    .thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true))
    .thenCompose(ws -> webSocket.sendClose(WebSocket.NORMAL_CLOSURE, ""))
    .join();