Spring WebSocket @SendToSession:向特定会话发送消息

Spring WebSocket @SendToSession: send message to specific session

是否可以向特定会话发送消息?

我在客户端和 Spring servlet 之间有一个未经身份验证的 websocket。当异步作业结束时,我需要向特定连接发送未经请求的消息。

@Controller
public class WebsocketTest {


     @Autowired
    public SimpMessageSendingOperations messagingTemplate;

    ExecutorService executor = Executors.newSingleThreadExecutor();

    @MessageMapping("/start")
    public void start(SimpMessageHeaderAccessor accessor) throws Exception {
        String applicantId=accessor.getSessionId();        
        executor.submit(() -> {
            //... slow job
            jobEnd(applicantId);
        });
    }

    public void jobEnd(String sessionId){
        messagingTemplate.convertAndSend("/queue/jobend"); //how to send only to that session?
    }
}

如您在此代码中所见,客户端可以启动一个异步作业,当它完成时,它需要结束消息。显然,我只需要向申请人发送消息,而不是广播给所有人。 最好有一个 @SendToSession 注释或 messagingTemplate.convertAndSendToSession 方法。

更新

我试过这个:

messagingTemplate.convertAndSend("/queue/jobend", true, Collections.singletonMap(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId));

但这会广播到所有会话,而不仅仅是指定的会话。

更新 2

使用 convertAndSendToUser() 方法进行测试。 本次测试和破解官方Spring教程:https://spring.io/guides/gs/messaging-stomp-websocket/

这是服务器代码:

@Controller
public class WebsocketTest {

    @PostConstruct
    public void init(){
        ScheduledExecutorService statusTimerExecutor=Executors.newSingleThreadScheduledExecutor();
        statusTimerExecutor.scheduleAtFixedRate(new Runnable() {                
            @Override
            public void run() {
                messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test"));
            }
        }, 5000,5000, TimeUnit.MILLISECONDS);
    } 

     @Autowired
        public SimpMessageSendingOperations messagingTemplate;
}

这是客户端代码:

function connect() {
            var socket = new WebSocket('ws://localhost:8080/hello');
            stompClient = Stomp.over(socket);
            stompClient.connect({}, function(frame) {
                setConnected(true);
                console.log('Connected: ' + frame);
                stompClient.subscribe('/user/queue/test', function(greeting){
                    console.log(JSON.parse(greeting.body));
                });
            });
        }

不幸的是,客户端没有按预期每 5000 毫秒收到一次会话回复。我确定“1”是连接的第二个客户端的有效 sessionId,因为我在 SimpMessageHeaderAccessor.getSessionId()

的调试模式下看到它

背景情景

我想为远程作业创建一个进度条,客户端向服务器请求异步作业,并通过从服务器发送的 websocket 消息检查其进度。这不是文件上传而是远程计算,因此只有服务器知道每个作业的进度。 我需要向特定会话发送消息,因为每个作业都是由会话启动的。 客户端请求远程计算 服务器启动此作业,并为每个作业步骤回复申请客户端及其作业进度状态。 客户端获取有关其工作的消息并建立一个 progress/status 栏。 这就是为什么我需要每个会话的消息。 我也可以使用每个用户的消息,但 Spring 不提供 每个用户的主动消息。 (Cannot send user message with Spring Websocket)

可行的解决方案

 __      __ ___   ___  _  __ ___  _  _   ___      ___   ___   _    _   _  _____  ___  ___   _  _ 
 \ \    / // _ \ | _ \| |/ /|_ _|| \| | / __|    / __| / _ \ | |  | | | ||_   _||_ _|/ _ \ | \| |
  \ \/\/ /| (_) ||   /| ' <  | | | .` || (_ |    \__ \| (_) || |__| |_| |  | |   | || (_) || .` |
   \_/\_/  \___/ |_|_\|_|\_\|___||_|\_| \___|    |___/ \___/ |____|\___/   |_|  |___|\___/ |_|\_|

从 UPDATE2 解决方案开始,我必须使用最后一个参数 (MessageHeaders) 完成 convertAndSendToUser 方法:

messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test"), createHeaders("1"));

其中createHeaders()是这个方法:

private MessageHeaders createHeaders(String sessionId) {
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);
        return headerAccessor.getMessageHeaders();
    }

这非常复杂,在我看来,不值得。 您需要通过会话 ID 为每个用户(甚至是未经身份验证的用户)创建订阅。

假设每个用户只为他订阅一个唯一的队列:

stompClient.subscribe('/session/specific' + uuid, handler);

在服务器上,在用户订阅之前,您需要通知并为特定会话发送消息并保存到地图:

    @MessageMapping("/putAnonymousSession/{sessionId}")
    public void start(@DestinationVariable sessionId) throws Exception {
        anonymousUserSession.put(key, sessionId);
    }

之后,当您要向用户发送消息时,您需要:

messagingTemplate.convertAndSend("/session/specific" + key); 

但我真的不知道你想做什么,也不知道你将如何找到特定的会话(谁是匿名的)。

不需要创建特定的目的地,从 Spring 4.1 开始就已经完成了(参见 SPR-11309)。

鉴于用户订阅了 /user/queue/something queue,您可以向单个 session 发送消息:

As stated in the SimpMessageSendingOperations Javadoc,因为您的用户名实际上是一个 sessionId,您必须将其设置为 header,否则 DefaultUserDestinationResolver 将不会能够路由邮件并将其丢弃。

SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor
    .create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);

messagingTemplate.convertAndSendToUser(sessionId,"/queue/something", payload, 
    headerAccessor.getMessageHeaders());

您不需要为此对用户进行身份验证。

您只需在

中添加会话 ID
  • 服务器端

    convertAndSendToUser(sessionId,apiName,responseObject);

  • 客户端

    $stomp.subscribe('/user/+sessionId+'/apiName',handler);

注:
不要忘记在服务器端的端点添加 '/user'

我一直在努力解决同样的问题,并且提出的解决方案对我不起作用,因此我不得不采取不同的方法:

  1. 修改网络套接字配置,以便用户将通过 session ID 识别:
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-endpoint")
                .setHandshakeHandler(new DefaultHandshakeHandler() {
                    
                    @Override
                    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
                        if (request instanceof ServletServerHttpRequest) {
                            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
                            HttpSession session = servletRequest.getServletRequest().getSession();
                            return new Principal() {
                                @Override
                                public String getName() {
                                    return session.getId();
                                }
                            };
                        } else {
                            return null;
                        }
                    }
                }).withSockJS();
    }
  1. 向那个 session id(没有 headers)发送消息:
    simpMessagingTemplate.convertAndSendToUser(sessionId, "/queue", payload);

最简单的方法是利用@SendToUser 中的广播参数。 文档:

Whether messages should be sent to all sessions associated with the user or only to the session of the input message being handled. By default, this is set to true in which case messages are broadcast to all sessions.

对于你的确切情况,它看起来像这样

    @MessageMapping("/start")
    @SendToUser(value = "/queue/jobend", broadcast = false)
    //...