Spring 引导和 WebSocket - 使用 Stomp 协议通过安全的 websocket 发送高吞吐量时连接丢失
Spring Boot and WebSockets - got connection lost when sending high throuput through secured websocket using Stomp protocol
我有 2 个 WebSockets 端点:
registry.addEndpoint("/ws-handshake").withSockJS();
registry.addEndpoint("/api/admin/ws-handshake").withSockJS();
第一个没有身份验证,第二个使用 Spring 引导安全保护,使用与 HTTP 安全性相同的配置,并使用 OAuth2。授权 header 用于连接到安全端点。
我使用一个简单的 for 循环,因为这是一个 POC,但在生产中将使用相同数量的数据,return 很多记录,这是通过使用 SimpMessageSendingOperations class 由 Spring 提供 Boot:
private SimpMessageSendingOperations messagingTemplate;
@MessageMapping("/tail-topic")
public void tailLogSendToTopic(@Payload WebSocketPoCPayload payload) throws InterruptedException {
String topicName = "/topic/logentries";
for (int i = 0; i < 3000; i++) {
WebSocketPoCMessage message = new WebSocketPoCMessage(String.format("%d : returning payload %s", i, payload.getName()));
messageTemplate.convertAndSend(topicName, message);
// TimeUnit.MILLISECONDS.sleep(50);
}
log.info("done returning all messages");
}
当我通过不安全的端点发送请求时,一切运行正常,我收到客户端的所有内容。
当我通过安全端点发送请求时,一切运行正常,直到达到某个有效载荷编号(例如 1499,始终是相同的有效载荷编号)。
除非我取消对超时的注释,但需要超过 10 毫秒,否则我会再次丢失连接,但在以后的数字上。
好像我的安全设置在他们看到太多流量时断开了连接,我是不是在我的安全配置中遗漏了什么?
由于对此似乎没有任何回应,我将接受临时解决方法作为解决方案:
private SimpMessageSendingOperations messagingTemplate;
@MessageMapping("/tail-topic")
public void tailLogSendToTopic(@Payload WebSocketPoCPayload payload) throws InterruptedException {
String topicName = "/topic/logentries";
for (int i = 0; i < 3000; i++) {
WebSocketPoCMessage message = new WebSocketPoCMessage(String.format("%d : returning payload %s", i, payload.getName()));
messageTemplate.convertAndSend(topicName, message);
TimeUnit.MILLISECONDS.sleep(50);
}
log.info("done returning all messages");
}
我想我已经找到了解决这个问题的真正方法,
尽管该框架是非阻塞的,它仍然会在每条消息发送后等待 rabbitMQ stomp broker 的响应。
在高吞吐量下发生的情况是有时响应丢失并且它永远挂起等待,在 StompBrokerRelayMessageHandler 中,在 future.get() 行:
try {
ListenableFuture<Void> future = super.forward(message, accessor);
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
}
return future;
}
catch (Throwable ex) {
throw new MessageDeliveryException(message, ex);
}
对我有用的解决方案是在不等待响应的情况下发送消息,方法是添加消息 header,如下所示:
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true);
accessor.setLeaveMutable(true);
messagingTemplate.convertAndSend(destination, message, accessor.getMessageHeaders());
当然在这种情况下,一些消息可能会丢失,但对于我的用例来说已经足够了。
另一个有效的解决方案是用超时和单线程执行程序包装 convertAndSend 行,然后重试
干杯
我有 2 个 WebSockets 端点:
registry.addEndpoint("/ws-handshake").withSockJS();
registry.addEndpoint("/api/admin/ws-handshake").withSockJS();
第一个没有身份验证,第二个使用 Spring 引导安全保护,使用与 HTTP 安全性相同的配置,并使用 OAuth2。授权 header 用于连接到安全端点。
我使用一个简单的 for 循环,因为这是一个 POC,但在生产中将使用相同数量的数据,return 很多记录,这是通过使用 SimpMessageSendingOperations class 由 Spring 提供 Boot:
private SimpMessageSendingOperations messagingTemplate;
@MessageMapping("/tail-topic")
public void tailLogSendToTopic(@Payload WebSocketPoCPayload payload) throws InterruptedException {
String topicName = "/topic/logentries";
for (int i = 0; i < 3000; i++) {
WebSocketPoCMessage message = new WebSocketPoCMessage(String.format("%d : returning payload %s", i, payload.getName()));
messageTemplate.convertAndSend(topicName, message);
// TimeUnit.MILLISECONDS.sleep(50);
}
log.info("done returning all messages");
}
当我通过不安全的端点发送请求时,一切运行正常,我收到客户端的所有内容。
当我通过安全端点发送请求时,一切运行正常,直到达到某个有效载荷编号(例如 1499,始终是相同的有效载荷编号)。 除非我取消对超时的注释,但需要超过 10 毫秒,否则我会再次丢失连接,但在以后的数字上。
好像我的安全设置在他们看到太多流量时断开了连接,我是不是在我的安全配置中遗漏了什么?
由于对此似乎没有任何回应,我将接受临时解决方法作为解决方案:
private SimpMessageSendingOperations messagingTemplate;
@MessageMapping("/tail-topic")
public void tailLogSendToTopic(@Payload WebSocketPoCPayload payload) throws InterruptedException {
String topicName = "/topic/logentries";
for (int i = 0; i < 3000; i++) {
WebSocketPoCMessage message = new WebSocketPoCMessage(String.format("%d : returning payload %s", i, payload.getName()));
messageTemplate.convertAndSend(topicName, message);
TimeUnit.MILLISECONDS.sleep(50);
}
log.info("done returning all messages");
}
我想我已经找到了解决这个问题的真正方法, 尽管该框架是非阻塞的,它仍然会在每条消息发送后等待 rabbitMQ stomp broker 的响应。 在高吞吐量下发生的情况是有时响应丢失并且它永远挂起等待,在 StompBrokerRelayMessageHandler 中,在 future.get() 行:
try {
ListenableFuture<Void> future = super.forward(message, accessor);
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
}
return future;
}
catch (Throwable ex) {
throw new MessageDeliveryException(message, ex);
}
对我有用的解决方案是在不等待响应的情况下发送消息,方法是添加消息 header,如下所示:
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true);
accessor.setLeaveMutable(true);
messagingTemplate.convertAndSend(destination, message, accessor.getMessageHeaders());
当然在这种情况下,一些消息可能会丢失,但对于我的用例来说已经足够了。
另一个有效的解决方案是用超时和单线程执行程序包装 convertAndSend 行,然后重试
干杯