WebSocket 关闭并出现协议错误 1002
WebSocket Closes with Protocol Error 1002
我正在实现 WebSocket 消息命令行客户端。
我已经检查过这个错误对应于协议的问题。我将 ws 升级到最新的 7.4.1。在后端,我使用 Spring Boot Websockets 2.3.4.RELEASE.
据说造成这种情况的 2 个主要原因是数据包丢失或格式错误的消息。
我已经做了一些检查来检查这些但 none 似乎有效。
我测试的消息很小,所以消息大小不应该是这种情况。连接完全在本地主机上。
我用 3 个用户测试了该解决方案,有时我得到这个错误,有时却没有。
谁能帮我弄清楚如何消除此类错误?
这是我用于客户端发送消息的代码:
async function test(number_of_messages, break_between_messages) {
const websocket = new WebSocket(url...)
websocket.on('message', function incoming(data) {
console.log(getMessage("Received", data))
});
websocket.on('close', function(data) {
console.log('Disconnected!!!! ' + data.toString());
});
const opened = await connection(websocket)
//Wait 5 seconds
await sleep(5_000);
if (opened) {
for (i = 0; i < number_of_messages; i++) {
for (const chatId of chatIds) {
let content = i.toString() + " from " + user;
let msg = JSON.stringify({
"chatId": chatId,
"author": user,
"content": content
})
websocket.send(msg)
let message = getMessage("Sent", msg)
console.log(message)
}
await sleep(break_between_messages);
}
} else {
console.log("ERROR on Opening Connection")
return
}
// Wait 1 minute
await sleep(60_000);
websocket.close()
}
后端代码:
@Component
@ServerEndpoint(value = "/webSocket/{username}",
encoders = MessageRepresentationEncoder.class, decoders = MessageRepresentationDecoder.class)
public class MessagingSocket {
private Logger logger = LoggerFactory.getInstance();
private Session session;
private MessagingAPI messagingAPI = MessagingAPIFactory.createAPI();
private UserSocketRegistry userSocketRegistry = UserSocketRegistry.createRegistry();
private SessionUserRegistry sessionUserRegistry = SessionUserRegistry.createRegistry();
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
this.session = session;
logger.log(LoggingType.INFO, "Started new session " + session.getId());
logger.log(LoggingType.INFO, username + " connected");
userSocketRegistry.addSessionForUser(this, username);
sessionUserRegistry.addSessionForUser(session, username);
}
@OnMessage //Allows the client to send message to the socket.
public void onMessage(MessageRepresentation messageRepresentation) {
logger.log(LoggingType.INFO, "Received " + messageRepresentation.toString());
messagingAPI.write(WriteMessage.from(UUID.fromString(messageRepresentation.chatId), messageRepresentation.author, messageRepresentation.content));
broadcastToChat(messageRepresentation);
}
private void broadcastToChat(MessageRepresentation message) {
final List<MessagingSocket> sockets = messagingAPI.getUsersConnectedToChat(UUID.fromString(message.chatId)).stream().filter(user -> userSocketRegistry.hasSocketFor(user.getName()))
.map(user -> userSocketRegistry.getSocketFor(user.getName())).collect(Collectors.toList());
logger.log(LoggingType.INFO, "Starting broadcast of " + message.content + " from " + message.author + " for " + String.join(",", messagingAPI.getUsersConnectedToChat(UUID.fromString(message.chatId)).stream().map(x -> x.getName()).collect(Collectors.toList())));
for (MessagingSocket messagingSocket : sockets) {
logger.log(LoggingType.INFO, "Broadcasting message" + message.content + " to " + messagingSocket.session.getId());
messagingSocket.sendMessage(message);
}
}
private void sendMessage(MessageRepresentation message) {
try {
this.session.getBasicRemote().sendObject(message);
} catch (IOException | EncodeException e) {
logger.log(LoggingType.ERROR, "Caught exception while sending message to Session Id: " + this.session.getId());
}
}
@OnClose
public void onClose(Session session) {
String user = sessionUserRegistry.getUserFor(session);
logger.log(LoggingType.INFO, "User " + user + " with session " + this.session.getId() + " disconnected ");
sessionUserRegistry.removeSession(session);
userSocketRegistry.removeUser(user);
}
}
MessageRepresentation 为:
public class MessageRepresentation {
public String chatId;
public String author;
public String content;
@Override
public String toString() {
return "MessageRepresentation{" +
"chatId='" + chatId + '\'' +
", author='" + author + '\'' +
", content='" + content + '\'' +
'}';
}
}
经过进一步调查,我收到以下异常:
java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1243)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textStart(WsRemoteEndpointImplBase.java:1205)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendString(WsRemoteEndpointImplBase.java:191)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendObject(WsRemoteEndpointImplBase.java:600)
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendObject(WsRemoteEndpointBasic.java:74)
at presentation.frontend.websockets.server.MessagingSocket.sendMessage(MessagingSocket.java:64)
at presentation.frontend.websockets.server.MessagingSocket.broadcastToChat(MessagingSocket.java:57)
at presentation.frontend.websockets.server.MessagingSocket.onMessage(MessagingSocket.java:47)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.tomcat.websocket.pojo.PojoMessageHandlerWholeBase.onMessage(PojoMessageHandlerWholeBase.java:80)
at org.apache.tomcat.websocket.WsFrameBase.sendMessageText(WsFrameBase.java:402)
at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageText(WsFrameServer.java:119)
at org.apache.tomcat.websocket.WsFrameBase.processDataText(WsFrameBase.java:502)
at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:301)
at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:133)
at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:82)
at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:171)
at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:151)
at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:148)
at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:54)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:59)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.IllegalStateException: 由于WebSocket 会话已关闭,消息将不会发送
我在这里可能偏离了标记,但我认为这可能是由于消息包含无效的 UTF8 或类似内容。即畸形。
如果这听起来像是原因,那么简单的解决方法就是对 msg
进行编码
let msg = JSON.stringify({
"chatId": chatId,
"author": user,
"content": content
})
到
let msg = unescape(encodeURIComponent(JSON.stringify({
"chatId": chatId,
"author": user,
"content": content
})));
然后在另一边解码...
JSON.parse(decodeURIComponent(escape( ... )))
这一问题的解决方案涉及 2 个步骤。
1:查找错误堆栈跟踪
@OnError
public void onError(Session session, Throwable throwable) {
logger.log(LoggingType.ERROR, "Error for " + session.getId() + " caused by: " + throwable.getMessage());
throwable.printStackTrace();
}
2:在广播消息中将同步BasicRemote改为异步AsyncRemote(当消息数量增加时这一点很重要)
private void sendMessage(MessageRepresentation message) {
this.session.getAsyncRemote().sendObject(message);
}
我正在实现 WebSocket 消息命令行客户端。 我已经检查过这个错误对应于协议的问题。我将 ws 升级到最新的 7.4.1。在后端,我使用 Spring Boot Websockets 2.3.4.RELEASE.
据说造成这种情况的 2 个主要原因是数据包丢失或格式错误的消息。 我已经做了一些检查来检查这些但 none 似乎有效。 我测试的消息很小,所以消息大小不应该是这种情况。连接完全在本地主机上。 我用 3 个用户测试了该解决方案,有时我得到这个错误,有时却没有。
谁能帮我弄清楚如何消除此类错误?
这是我用于客户端发送消息的代码:
async function test(number_of_messages, break_between_messages) {
const websocket = new WebSocket(url...)
websocket.on('message', function incoming(data) {
console.log(getMessage("Received", data))
});
websocket.on('close', function(data) {
console.log('Disconnected!!!! ' + data.toString());
});
const opened = await connection(websocket)
//Wait 5 seconds
await sleep(5_000);
if (opened) {
for (i = 0; i < number_of_messages; i++) {
for (const chatId of chatIds) {
let content = i.toString() + " from " + user;
let msg = JSON.stringify({
"chatId": chatId,
"author": user,
"content": content
})
websocket.send(msg)
let message = getMessage("Sent", msg)
console.log(message)
}
await sleep(break_between_messages);
}
} else {
console.log("ERROR on Opening Connection")
return
}
// Wait 1 minute
await sleep(60_000);
websocket.close()
}
后端代码:
@Component
@ServerEndpoint(value = "/webSocket/{username}",
encoders = MessageRepresentationEncoder.class, decoders = MessageRepresentationDecoder.class)
public class MessagingSocket {
private Logger logger = LoggerFactory.getInstance();
private Session session;
private MessagingAPI messagingAPI = MessagingAPIFactory.createAPI();
private UserSocketRegistry userSocketRegistry = UserSocketRegistry.createRegistry();
private SessionUserRegistry sessionUserRegistry = SessionUserRegistry.createRegistry();
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
this.session = session;
logger.log(LoggingType.INFO, "Started new session " + session.getId());
logger.log(LoggingType.INFO, username + " connected");
userSocketRegistry.addSessionForUser(this, username);
sessionUserRegistry.addSessionForUser(session, username);
}
@OnMessage //Allows the client to send message to the socket.
public void onMessage(MessageRepresentation messageRepresentation) {
logger.log(LoggingType.INFO, "Received " + messageRepresentation.toString());
messagingAPI.write(WriteMessage.from(UUID.fromString(messageRepresentation.chatId), messageRepresentation.author, messageRepresentation.content));
broadcastToChat(messageRepresentation);
}
private void broadcastToChat(MessageRepresentation message) {
final List<MessagingSocket> sockets = messagingAPI.getUsersConnectedToChat(UUID.fromString(message.chatId)).stream().filter(user -> userSocketRegistry.hasSocketFor(user.getName()))
.map(user -> userSocketRegistry.getSocketFor(user.getName())).collect(Collectors.toList());
logger.log(LoggingType.INFO, "Starting broadcast of " + message.content + " from " + message.author + " for " + String.join(",", messagingAPI.getUsersConnectedToChat(UUID.fromString(message.chatId)).stream().map(x -> x.getName()).collect(Collectors.toList())));
for (MessagingSocket messagingSocket : sockets) {
logger.log(LoggingType.INFO, "Broadcasting message" + message.content + " to " + messagingSocket.session.getId());
messagingSocket.sendMessage(message);
}
}
private void sendMessage(MessageRepresentation message) {
try {
this.session.getBasicRemote().sendObject(message);
} catch (IOException | EncodeException e) {
logger.log(LoggingType.ERROR, "Caught exception while sending message to Session Id: " + this.session.getId());
}
}
@OnClose
public void onClose(Session session) {
String user = sessionUserRegistry.getUserFor(session);
logger.log(LoggingType.INFO, "User " + user + " with session " + this.session.getId() + " disconnected ");
sessionUserRegistry.removeSession(session);
userSocketRegistry.removeUser(user);
}
}
MessageRepresentation 为:
public class MessageRepresentation {
public String chatId;
public String author;
public String content;
@Override
public String toString() {
return "MessageRepresentation{" +
"chatId='" + chatId + '\'' +
", author='" + author + '\'' +
", content='" + content + '\'' +
'}';
}
}
经过进一步调查,我收到以下异常:
java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1243)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textStart(WsRemoteEndpointImplBase.java:1205)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendString(WsRemoteEndpointImplBase.java:191)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendObject(WsRemoteEndpointImplBase.java:600)
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendObject(WsRemoteEndpointBasic.java:74)
at presentation.frontend.websockets.server.MessagingSocket.sendMessage(MessagingSocket.java:64)
at presentation.frontend.websockets.server.MessagingSocket.broadcastToChat(MessagingSocket.java:57)
at presentation.frontend.websockets.server.MessagingSocket.onMessage(MessagingSocket.java:47)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.tomcat.websocket.pojo.PojoMessageHandlerWholeBase.onMessage(PojoMessageHandlerWholeBase.java:80)
at org.apache.tomcat.websocket.WsFrameBase.sendMessageText(WsFrameBase.java:402)
at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageText(WsFrameServer.java:119)
at org.apache.tomcat.websocket.WsFrameBase.processDataText(WsFrameBase.java:502)
at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:301)
at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:133)
at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:82)
at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:171)
at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:151)
at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:148)
at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:54)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:59)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.IllegalStateException: 由于WebSocket 会话已关闭,消息将不会发送
我在这里可能偏离了标记,但我认为这可能是由于消息包含无效的 UTF8 或类似内容。即畸形。
如果这听起来像是原因,那么简单的解决方法就是对 msg
let msg = JSON.stringify({
"chatId": chatId,
"author": user,
"content": content
})
到
let msg = unescape(encodeURIComponent(JSON.stringify({
"chatId": chatId,
"author": user,
"content": content
})));
然后在另一边解码...
JSON.parse(decodeURIComponent(escape( ... )))
这一问题的解决方案涉及 2 个步骤。
1:查找错误堆栈跟踪
@OnError
public void onError(Session session, Throwable throwable) {
logger.log(LoggingType.ERROR, "Error for " + session.getId() + " caused by: " + throwable.getMessage());
throwable.printStackTrace();
}
2:在广播消息中将同步BasicRemote改为异步AsyncRemote(当消息数量增加时这一点很重要)
private void sendMessage(MessageRepresentation message) {
this.session.getAsyncRemote().sendObject(message);
}