Spring 引导和 WebSocket - 使用 Stomp 协议通过安全的 websocket 发送高吞吐量时连接丢失

Spring Boot and WebSockets - got connection lost when sending high throuput through secured websocket using Stomp protocol

我有 2 个 WebSockets 端点:

registry.addEndpoint("/ws-handshake").withSockJS();
registry.addEndpoint("/api/admin/ws-handshake").withSockJS();

第一个没有身份验证,第二个使用 Spring 引导安全保护,使用与 HTTP 安全性相同的配置,并使用 OAuth2。授权 header 用于连接到安全端点。

我使用一个简单的 for 循环,因为这是一个 POC,但在生产中将使用相同数量的数据,return 很多记录,这是通过使用 SimpMessageSendingOperations class 由 Spring 提供 Boot:

    private SimpMessageSendingOperations messagingTemplate;

    @MessageMapping("/tail-topic")
    public void tailLogSendToTopic(@Payload WebSocketPoCPayload payload) throws InterruptedException {
        String topicName = "/topic/logentries";
        for (int i = 0; i < 3000; i++) {
            WebSocketPoCMessage message = new WebSocketPoCMessage(String.format("%d : returning payload %s", i, payload.getName()));
            messageTemplate.convertAndSend(topicName, message);
            // TimeUnit.MILLISECONDS.sleep(50);
        }
        log.info("done returning all messages");
    }

当我通过不安全的端点发送请求时,一切运行正常,我收到客户端的所有内容。

当我通过安全端点发送请求时,一切运行正常,直到达到某个有效载荷编号(例如 1499,始终是相同的有效载荷编号)。 除非我取消对超时的注释,但需要超过 10 毫秒,否则我会再次丢失连接,但在以后的数字上。

好像我的安全设置在他们看到太多流量时断开了连接,我是不是在我的安全配置中遗漏了什么?

由于对此似乎没有任何回应,我将接受临时解决方法作为解决方案:

private SimpMessageSendingOperations messagingTemplate;

@MessageMapping("/tail-topic")
public void tailLogSendToTopic(@Payload WebSocketPoCPayload payload) throws InterruptedException {
    String topicName = "/topic/logentries";
    for (int i = 0; i < 3000; i++) {
        WebSocketPoCMessage message = new WebSocketPoCMessage(String.format("%d : returning payload %s", i, payload.getName()));
        messageTemplate.convertAndSend(topicName, message);
        TimeUnit.MILLISECONDS.sleep(50);
    }
    log.info("done returning all messages");
}

我想我已经找到了解决这个问题的真正方法, 尽管该框架是非阻塞的,它仍然会在每条消息发送后等待 rabbitMQ stomp broker 的响应。 在高吞吐量下发生的情况是有时响应丢失并且它永远挂起等待,在 StompBrokerRelayMessageHandler 中,在 future.get() 行:

        try {
            ListenableFuture<Void> future = super.forward(message, accessor);
            if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
                future.get();
            }
            return future;
        }
        catch (Throwable ex) {
            throw new MessageDeliveryException(message, ex);
        }

对我有用的解决方案是在不等待响应的情况下发送消息,方法是添加消息 header,如下所示:

    SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
    accessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true);
    accessor.setLeaveMutable(true);
    messagingTemplate.convertAndSend(destination, message, accessor.getMessageHeaders());

当然在这种情况下,一些消息可能会丢失,但对于我的用例来说已经足够了。

另一个有效的解决方案是用超时和单线程执行程序包装 convertAndSend 行,然后重试

干杯