使用持久队列 STOMP 向离线用户发送通知
Send notification to offline user with persistent queue STOMP
我有这个代码:客户端 javascript:
socket = new SockJS(context.backend + '/myWebSocketEndPoint');
stompClient = Stomp.over(socket);
stompClient.connect({},function (frame) {
stompClient.subscribe('/queue/'+clientId+'/notification', function(response){
alert(angular.fromJson(response.body));
});
});
在此代码中,客户端在连接时使用“/queue/”+ 他的客户端 ID +“/notification/”订阅接收通知。所以我为每个客户都有一个队列。我使用 stomp 和 sockjs
在我的服务器(Java + spring 启动)中,我有一个通知侦听器,当事件发布时,它会向所有客户端发送通知。所以我有:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/queue");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/myWebSocketEndPoint")
.setAllowedOrigins("*")
.withSockJS();
}
}
调用 MenuItemNotificationSender 向用户发送通知的 class MenuItemNotificationChannel。
@Component
public class MenuItemNotificationChannel extends AbstractNotificationChannel {
@Autowired
private MenuItemNotificationSender menuItemNotificationSender;
@Autowired
private UserRepository userRepository;
@Override
public void sendNotification(KitaiEvent<?> event, Map<String, Object> notificationConfiguration) throws Exception {
String menuItem = Optional.ofNullable((String) notificationConfiguration.get(MENU_ENTRY_KEY)).orElseThrow(IllegalArgumentException::new);
List<User> userList = userRepository.findAll();
for(User u: userList){
menuItemNotificationSender.sendNotification(new MenuItemDto(menuItem),u.getId());
}
MenuItemNotificationSender class 是:
@Component
public class MenuItemNotificationSender {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate){
this.messagingTemplate = messagingTemplate;
}
public void sendNotification(MenuItemDto menuItem,Long id) {
String address = "/queue/"+id+"/notification";
messagingTemplate.convertAndSend(address, menuItem);
}
}
此代码完美运行:通知发送给每个用户。但如果用户不在线,通知就会丢失。我的问题是:
我如何验证 whit stomp 哪些订阅有效,哪些不有效?? (如果我可以验证订阅是否有效,我就解决了我的问题,因为我为离线用户保存了通知,然后在他们登录时发送给他们)
我可以使用持久队列吗? (我读过一些关于它的内容,但我不明白我是否只能将它与 stomp 和 sockjs 一起使用)
对不起我的英语! :D
您可以在会话连接事件和会话断开事件上放置一个 spring 事件侦听器
我用 spring 4.3.4
测试了这个
@Component
public class WebSocketSessionListener
{
private static final Logger logger = LoggerFactory.getLogger(WebSocketSessionListener.class.getName());
private List<String> connectedClientId = new ArrayList<String>();
@EventListener
public void connectionEstablished(SessionConnectedEvent sce)
{
MessageHeaders msgHeaders = sce.getMessage().getHeaders();
Principal princ = (Principal) msgHeaders.get("simpUser");
StompHeaderAccessor sha = StompHeaderAccessor.wrap(sce.getMessage());
List<String> nativeHeaders = sha.getNativeHeader("userId");
if( nativeHeaders != null )
{
String userId = nativeHeaders.get(0);
connectedClientId.add(userId);
if( logger.isDebugEnabled() )
{
logger.debug("Connessione websocket stabilita. ID Utente "+userId);
}
}
else
{
String userId = princ.getName();
connectedClientId.add(userId);
if( logger.isDebugEnabled() )
{
logger.debug("Connessione websocket stabilita. ID Utente "+userId);
}
}
}
@EventListener
public void webSockectDisconnect(SessionDisconnectEvent sde)
{
MessageHeaders msgHeaders = sde.getMessage().getHeaders();
Principal princ = (Principal) msgHeaders.get("simpUser");
StompHeaderAccessor sha = StompHeaderAccessor.wrap(sde.getMessage());
List<String> nativeHeaders = sha.getNativeHeader("userId");
if( nativeHeaders != null )
{
String userId = nativeHeaders.get(0);
connectedClientId.remove(userId);
if( logger.isDebugEnabled() )
{
logger.debug("Disconnessione websocket. ID Utente "+userId);
}
}
else
{
String userId = princ.getName();
connectedClientId.remove(userId);
if( logger.isDebugEnabled() )
{
logger.debug("Disconnessione websocket. ID Utente "+userId);
}
}
}
public List<String> getConnectedClientId()
{
return connectedClientId;
}
public void setConnectedClientId(List<String> connectedClientId)
{
this.connectedClientId = connectedClientId;
}
}
当客户端连接时,您在客户端 ID 列表中添加客户端 ID;当它断开连接时,您将其删除
然后你可以在你想检查客户端是否处于活动状态或更少的地方注入这个bean或其列表,然后你可以检查客户端ID是否在你可以发送消息的连接的客户端ID之间,否则你必须保存并稍后重新发送
在客户端你可以这样做:
var socket = new SockJS('/ws');
stompClient = Stomp.over(socket);
stompClient.connect({userId:"customUserId"}, function (frame) {
});
安杰洛
为什么不使用下面的一些事件,您可以将 类 导出到不同的文件并使用 SessionConnectedEvent
和 SessionDisconnectEvent
或 SessionSubscribeEvent
和 SessionUnsubscribeEvent
。
在这里查看文档 http://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html#websocket-stomp-appplication-context-events
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;
@Component
public class SessionConnectedListener extends SessionsListener implements ApplicationListener<SessionConnectedEvent> {
@Override
public void onApplicationEvent(SessionConnectedEvent event) {
users.add(event.getUser().getName());
}
}
@Component
class SessionDisconnectListener extends SessionsListener implements ApplicationListener<SessionDisconnectEvent> {
@Override
public void onApplicationEvent(SessionDisconnectEvent event) {
users.remove(event.getUser().getName());
}
}
@Component
class SessionSubscribeListener extends SessionsListener implements ApplicationListener<SessionSubscribeEvent> {
@Override
public void onApplicationEvent(SessionSubscribeEvent event) {
users.add(event.getUser().getName());
}
}
@Component
class SessionUnsubscribeListener extends SessionsListener implements ApplicationListener<SessionUnsubscribeEvent> {
@Override
public void onApplicationEvent(SessionUnsubscribeEvent event) {
users.remove(event.getUser().getName());
}
}
class SessionsListener {
protected List<String> users = Collections.synchronizedList(new LinkedList<String>());
public List<String> getUsers() {
return users;
}
}
并更改您的代码:
@Autowired
private SessionsListener sessionsListener;
@Override
public void sendNotification(KitaiEvent<?> event, Map<String, Object> notificationConfiguration) throws Exception {
String menuItem = Optional.ofNullable((String) notificationConfiguration.get(MENU_ENTRY_KEY)).orElseThrow(IllegalArgumentException::new);
List<String> userList = sessionsListener.getUsers();
for(String u: userList){
menuItemNotificationSender.sendNotification(new MenuItemDto(menuItem),u);
}
我有这个代码:客户端 javascript:
socket = new SockJS(context.backend + '/myWebSocketEndPoint');
stompClient = Stomp.over(socket);
stompClient.connect({},function (frame) {
stompClient.subscribe('/queue/'+clientId+'/notification', function(response){
alert(angular.fromJson(response.body));
});
});
在此代码中,客户端在连接时使用“/queue/”+ 他的客户端 ID +“/notification/”订阅接收通知。所以我为每个客户都有一个队列。我使用 stomp 和 sockjs
在我的服务器(Java + spring 启动)中,我有一个通知侦听器,当事件发布时,它会向所有客户端发送通知。所以我有:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/queue");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/myWebSocketEndPoint")
.setAllowedOrigins("*")
.withSockJS();
}
}
调用 MenuItemNotificationSender 向用户发送通知的 class MenuItemNotificationChannel。
@Component
public class MenuItemNotificationChannel extends AbstractNotificationChannel {
@Autowired
private MenuItemNotificationSender menuItemNotificationSender;
@Autowired
private UserRepository userRepository;
@Override
public void sendNotification(KitaiEvent<?> event, Map<String, Object> notificationConfiguration) throws Exception {
String menuItem = Optional.ofNullable((String) notificationConfiguration.get(MENU_ENTRY_KEY)).orElseThrow(IllegalArgumentException::new);
List<User> userList = userRepository.findAll();
for(User u: userList){
menuItemNotificationSender.sendNotification(new MenuItemDto(menuItem),u.getId());
}
MenuItemNotificationSender class 是:
@Component
public class MenuItemNotificationSender {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate){
this.messagingTemplate = messagingTemplate;
}
public void sendNotification(MenuItemDto menuItem,Long id) {
String address = "/queue/"+id+"/notification";
messagingTemplate.convertAndSend(address, menuItem);
}
}
此代码完美运行:通知发送给每个用户。但如果用户不在线,通知就会丢失。我的问题是:
我如何验证 whit stomp 哪些订阅有效,哪些不有效?? (如果我可以验证订阅是否有效,我就解决了我的问题,因为我为离线用户保存了通知,然后在他们登录时发送给他们)
我可以使用持久队列吗? (我读过一些关于它的内容,但我不明白我是否只能将它与 stomp 和 sockjs 一起使用)
对不起我的英语! :D
您可以在会话连接事件和会话断开事件上放置一个 spring 事件侦听器 我用 spring 4.3.4
测试了这个@Component
public class WebSocketSessionListener
{
private static final Logger logger = LoggerFactory.getLogger(WebSocketSessionListener.class.getName());
private List<String> connectedClientId = new ArrayList<String>();
@EventListener
public void connectionEstablished(SessionConnectedEvent sce)
{
MessageHeaders msgHeaders = sce.getMessage().getHeaders();
Principal princ = (Principal) msgHeaders.get("simpUser");
StompHeaderAccessor sha = StompHeaderAccessor.wrap(sce.getMessage());
List<String> nativeHeaders = sha.getNativeHeader("userId");
if( nativeHeaders != null )
{
String userId = nativeHeaders.get(0);
connectedClientId.add(userId);
if( logger.isDebugEnabled() )
{
logger.debug("Connessione websocket stabilita. ID Utente "+userId);
}
}
else
{
String userId = princ.getName();
connectedClientId.add(userId);
if( logger.isDebugEnabled() )
{
logger.debug("Connessione websocket stabilita. ID Utente "+userId);
}
}
}
@EventListener
public void webSockectDisconnect(SessionDisconnectEvent sde)
{
MessageHeaders msgHeaders = sde.getMessage().getHeaders();
Principal princ = (Principal) msgHeaders.get("simpUser");
StompHeaderAccessor sha = StompHeaderAccessor.wrap(sde.getMessage());
List<String> nativeHeaders = sha.getNativeHeader("userId");
if( nativeHeaders != null )
{
String userId = nativeHeaders.get(0);
connectedClientId.remove(userId);
if( logger.isDebugEnabled() )
{
logger.debug("Disconnessione websocket. ID Utente "+userId);
}
}
else
{
String userId = princ.getName();
connectedClientId.remove(userId);
if( logger.isDebugEnabled() )
{
logger.debug("Disconnessione websocket. ID Utente "+userId);
}
}
}
public List<String> getConnectedClientId()
{
return connectedClientId;
}
public void setConnectedClientId(List<String> connectedClientId)
{
this.connectedClientId = connectedClientId;
}
}
当客户端连接时,您在客户端 ID 列表中添加客户端 ID;当它断开连接时,您将其删除
然后你可以在你想检查客户端是否处于活动状态或更少的地方注入这个bean或其列表,然后你可以检查客户端ID是否在你可以发送消息的连接的客户端ID之间,否则你必须保存并稍后重新发送
在客户端你可以这样做:
var socket = new SockJS('/ws');
stompClient = Stomp.over(socket);
stompClient.connect({userId:"customUserId"}, function (frame) {
});
安杰洛
为什么不使用下面的一些事件,您可以将 类 导出到不同的文件并使用 SessionConnectedEvent
和 SessionDisconnectEvent
或 SessionSubscribeEvent
和 SessionUnsubscribeEvent
。
在这里查看文档 http://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html#websocket-stomp-appplication-context-events
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;
@Component
public class SessionConnectedListener extends SessionsListener implements ApplicationListener<SessionConnectedEvent> {
@Override
public void onApplicationEvent(SessionConnectedEvent event) {
users.add(event.getUser().getName());
}
}
@Component
class SessionDisconnectListener extends SessionsListener implements ApplicationListener<SessionDisconnectEvent> {
@Override
public void onApplicationEvent(SessionDisconnectEvent event) {
users.remove(event.getUser().getName());
}
}
@Component
class SessionSubscribeListener extends SessionsListener implements ApplicationListener<SessionSubscribeEvent> {
@Override
public void onApplicationEvent(SessionSubscribeEvent event) {
users.add(event.getUser().getName());
}
}
@Component
class SessionUnsubscribeListener extends SessionsListener implements ApplicationListener<SessionUnsubscribeEvent> {
@Override
public void onApplicationEvent(SessionUnsubscribeEvent event) {
users.remove(event.getUser().getName());
}
}
class SessionsListener {
protected List<String> users = Collections.synchronizedList(new LinkedList<String>());
public List<String> getUsers() {
return users;
}
}
并更改您的代码:
@Autowired
private SessionsListener sessionsListener;
@Override
public void sendNotification(KitaiEvent<?> event, Map<String, Object> notificationConfiguration) throws Exception {
String menuItem = Optional.ofNullable((String) notificationConfiguration.get(MENU_ENTRY_KEY)).orElseThrow(IllegalArgumentException::new);
List<String> userList = sessionsListener.getUsers();
for(String u: userList){
menuItemNotificationSender.sendNotification(new MenuItemDto(menuItem),u);
}