Spring 通过 WebSocket 连接的 SseEmitter

Spring SseEmitter over WebSocket connection

我的目标是从前端向后端发送一个请求并接收多个响应。我正在使用 WebSocket,因为响应非常频繁,而且 WebSocket 似乎是最好的协议,而且 SseEmitter 从后端发送多个响应。

这是我的请求控制器:

@MessageMapping("/emitter")
@SendTo("/topic/response")
public SseEmitter output(RunData runData) throws Exception {
    SseEmitter emitter = new SseEmitter();
    new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                RemoteHostController rhc = new RemoteHostController(runData);
                rhc.execute();
                while (rhc.getActiveCount() > 0) {
                    emitter.send(rhc.getAllOutput());
                    Thread.sleep(2000);
                }

                 emitter.complete();
            } catch (Exception ee) {
                ee.printStackTrace();
                emitter.completeWithError(ee);
            }
        }
    }).start();

    return emitter;
}

RemoteHostController 正在管理主机的连接和 getAllOutput returns 输出。

前端应用程序 运行 非常简单 index.html 使用 Stomp 和 SockJS 连接到 websocket,将数据发送到服务器并生成

带有来自响应的数据的标记:

function connect() {
        var socket = new SockJS('http://localhost:8080/emitter');
        stompClient = Stomp.over(socket);
        stompClient.connect({}, function(frame) {
            setConnected(true);
            console.log('Connected: ' + frame);
            stompClient.subscribe('/topic/response', function(greeting){
                showOutput(greeting.body);
            });
        });
}

function sendData() {
        var hostname = document.getElementById('hostname').value;
        var username = document.getElementById('username').value;
        var password = document.getElementById('password').value;
        var command = document.getElementById('command').value;
        stompClient.send("/app/emitter", {}, JSON.stringify({ 'hostname': hostname,
                                                    'username': username,
                                                    'password': password,
                                                    'command': command}));
}

function showOutput(message) {
        var response = document.getElementById('response');
        var p = document.createElement('p');
        p.style.wordWrap = 'break-word';
        p.appendChild(document.createTextNode(message));
        response.appendChild(p);
}

当我向后端发送数据时,我得到的唯一响应是:

{"timeout":null}

它是 SseEmitter 超时字段,当我更改超时时它将 return {"timeout":<timeout_value>}.

我可以在日志中看到 RemoteHostController 正在连接到主机并正确执行命令。

我是不是做错了什么?或者WebSocket只支持一请求一响应通信?

这里是 WebSocket 和 SSE 的例子。如上所述,IE 浏览器不支持 SSE。尽可能多地添加以确保完整性。确保在使用 SeeEmitter 时没有使用 RestController,因为那将 return 对象,这是我根据上面的描述猜测的。

pom.xml

<dependencies>
    <!-- Spring boot framework -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

Web 套接字配置:

@Configuration
@EnableWebSocketMessageBroker
public class ApplicationWebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        super.configureMessageBroker(registry);
        registry.enableSimpleBroker("/topic");
    }

    public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
        stompEndpointRegistry.addEndpoint("/socketrequest").withSockJS();
    }
}

请求数据:

public class RequestData {
    private String string1;
    private String string2;
    // excluding getters and setters
}

Web 套接字控制器:

@Controller
public class WebSocketController {
   @Autowired
    SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/processrequest")
    void runWebSocket( RequestData requestData ) {
        new Thread(new RunProcess(requestData)).start();
    }

    private class RunProcess implements Runnable {
        private RequestData requestData;

        RunProcess(RequestData requestData) {
            this.requestData = requestData;
        }

        public void run() {
            simpMessagingTemplate.convertAndSend("/topic/response", requestData.getString1());
            simpMessagingTemplate.convertAndSend("/topic/response", requestData.getString2());
            simpMessagingTemplate.convertAndSend("/topic/response", "A third response via websocket");
        }
    }
}

SSE 控制器:

@Controller
public class SseController {

    @RequestMapping("/emitter")
    public SseEmitter runEmitter(@RequestParam(value = "string1") String string1,
                                 @RequestParam(value = "string2") String string2)
    {
        SseEmitter sseEmitter = new SseEmitter();
        RequestData requestData = new RequestData();
        requestData.setString1(string1);
        requestData.setString2(string2);
        new Thread(new RunProcess(requestData,sseEmitter)).start();
        return sseEmitter;
    }

    private class RunProcess implements Runnable {
        private RequestData requestData;
        private SseEmitter sseEmitter;

        RunProcess(RequestData requestData, SseEmitter sseEmitter) {
            this.requestData = requestData;
            this.sseEmitter = sseEmitter;
        }

        public void run() {
            try {
                sseEmitter.send(requestData.getString1());
                sseEmitter.send(requestData.getString2());
                sseEmitter.send("A third response from SseEmitter");
                sseEmitter.complete();
            } catch (IOException e) {
                e.printStackTrace();
                sseEmitter.completeWithError(e);
            }
        }
    }

}

Html代码:

    <script src="/javascript/sockjs-0.3.4.js"></script>
    <script src="/javascript/stomp.js"></script>

    <script type="text/javascript">
    var stompClient = null;

    function connect() {
        var socket = new SockJS('http://localhost:8085/socketrequest');
        stompClient = Stomp.over(socket);
        stompClient.connect({}, function(frame) {
            console.log('Connected: ' + frame);
            stompClient.subscribe('/topic/response', function(message){
                showOutput(message.body);
            });
        });
    }

    function doWebsocket() {
        stompClient.send("/processrequest", {}, JSON.stringify({ 'string1': 'The first string', 'string2' : 'The second string' }));
    }


    function doSse() {
        console.log("doSse");
        var rtUrl= '/emitter?string1=first string sse&string2=second string sse';
        var source = new EventSource(rtUrl);
        source.onmessage=function(event){
            showOutput(event.data)
        };
    }

    function showOutput(message) {
        var response = document.getElementById('response');
        var p = document.createElement('p');
        p.style.wordWrap = 'break-word';
        p.appendChild(document.createTextNode(message));
        response.appendChild(p);
    }

    connect();

    </script>
    </head>

<div>
Starting page
</div>
<div>
    <button id="websocket" onclick="doWebsocket();">WebSocket</button>
    <button id="sse" onclick="doSse();">Server Side Events</button>
</div>
<div >
    Response:
    <p id="response"></p>
</div>

</html>