如何使用 spring websocket 向自定义用户发送自定义消息?
How to send custom message to custom user with spring websocket?
我是 spring websocket 的新手。我想将产品更改发送给客户。为此,我想按如下方式进行:
客户端创建套接字连接并订阅目的地:
var socket = new SockJS('/websocket');
var stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
stompClient.subscribe('/product/changes', function (scoredata) {
// We received product changes
});
});
//Send Ajax request and say server I want to know product with id=5 changes.
sendAjaxRequest(5);
我已将 spring 应用配置如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/product/");
registry.setApplicationDestinationPrefixes("/app");
}
}
现在我需要以下方法:
@RestController
public class ProductController {
@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
// register current websocket session with product id and
// then with convertAndSendToUser send changes to current user.
}
}
我该如何实施?
Spring documentation 是学习网络套接字概念的良好起点。
要发送给客户端,您可以使用 SimpMessageSendingOperations.
@Autowired
private SimpMessageSendingOperations messageSendingOperations;
从控制器方法,消息可以通过如下方式发送:
messageSendingOperations.convertAndSendToUser(websocketUserId, "/product/changes", messageObject);
我的第一个问题是,当您成功将 websockets 与 stomp 集成后,为什么还要尝试向 rest 控制器发送 http 请求?
如果我对你的用例理解正确,我能想到的atm应该有三种解决方案。
解决方案 1(套接字会话 ↔ 产品 ID)
您可以通过打开的 websocket 连接将您的请求直接从您的客户端发送到服务器。 Spring 然后可以确定哪个 Websocket 会话进行了调用,您可以实现您的业务逻辑。您需要激活另一个名为“/queue”的代理,并指定订阅不用于广播时所需的用户目标前缀。在客户端,您还必须更改您的订阅路径。最后,您必须创建一个用 @Controller 注释的 class,其中包含您的消息映射以从连接的客户端接收消息。
服务器配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue", "/product"); // <- added "/queue"
registry.setApplicationDestinationPrefixes("/app");
registry.setUserDestinationPrefix("/user");
}
}
服务器控制器
@Controller
public class WebSocketContoller{
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
@MessageMapping("/product/register")
public void register(@Payload Long productId, @Header("simpSessionId") String sessionId) {
// register current websocket session with product id and
// then with convertAndSendToUser send changes to current user.
// Example of how to send a message to the user using the sessionId
String response = "This could also be one of your product objects of type Product";
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);
messagingTemplate.convertAndSendToUser(sessionId,"/queue/product/changes", response, headerAccessor.getMessageHeaders());
}
}
客户端订阅更改
stompClient.subscribe('/user/queue/product/changes', function (scoredata) {
// We received product changes
});
有关详细信息,您还可以查看此答案:
解决方案 2(主体 ↔ 产品 ID)
然而,如果你真的想考虑使用rest controller来开始注册你的进程,或者它只是不符合你的要求,你应该看看下面的link。 Spring 还能够通过公开的 SimpUserRegistry bean 跟踪活动的 websocket 会话及其用户。但是,您需要根据应用程序的安全性为客户端输入通道配置自定义 ChannelInterceptor 适配器,以确定用户。
查看此答案以获取详细信息和代码示例:
解决方案 3(产品 ID 主题)
您还可以订阅特定的产品 ID 主题,这样您甚至不需要知道哪个用户希望收到有关特定产品更改的通知。
客户端订阅更改
//e.g if you want to be notified about changes for products with id 5
stompClient.subscribe('/product/changes/5', function (scoredata) {
// We received product changes
});
服务器服务示例
@Service
public class WebSocketProductService{
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
// This would be the method which should inform your clients about specific product
// changes, instead of the String parameters a Product object should be used instead,
// you have to call this method yourself on product changes or schedule it or sth.
public void sendProductChange(String product, String productId) {
this.simpMessagingTemplate.convertAndSend("/product/changes/"+productId, product);
}
}
服务器控制器
如果您想管理产品 ID 订阅列表,则需要。就像解决方案 1 中解释的那样,您需要一个用 @Controller 注释的 class,它包含一个用 @SubscribeMapping 注释的方法。如果客户端尝试订阅指定路径,则会调用此方法。
@Controller
public class WebSocketContoller{
@SubscribeMapping("/product/changes/{productId}")
public void productIdSubscription(@DestinationVariable Long productId) {
//Manage your product id subscription list e.g.
}
}
如果您只想在用户请求时向用户发送产品更新,那么您可以使用普通的 HTTP 请求。但我知道您想根据特定于用户的业务逻辑推送通知。您还必须实施 Spring Security
来验证您的用户。
解决方案
我建议使用 user_product_updates( user_id, product_id)
table 在您的后端添加此业务逻辑 - 每行对应一个 product_id
,user_id
的用户想要订阅到更新:
@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
// Save this custom setting into your models
}
现在您可以运行一个预定的后端作业(可以是一个cron作业,基于您的推送通知) 向您的用户发送更新:
@Autowired
org.springframework.messaging.simp.SimpMessagingTemplate simpMessagingTemplate;
@Scheduled(cron = "0 0 1 * * ?") // based on your business logic (say daily at 12:01 am)
public void scheduleTaskUsingCronExpression() {
// loop through user_product_updates table and construct "data"
// username is from your spring security username (principal.getName())
simpMessagingTemplate.convertAndSendToUser(username, "/queue/products", data);
}
展望未来,您可能需要添加一些缓存来优化它们(尤其是从 product_id
获取 产品信息),以便 运行 顺利进行。
总结
您的网络套接字配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app")
.setUserDestinationPrefix("/user")
.enableSimpleBroker("/topic", "/queue", "/product");
}
}
前端应用程序中的侦听器可能如下所示:
that.stompClient.subscribe("/user/queue/products", (message) => {
if (message.body) {
// We received product changes
}
});
用户将注册产品更新:
@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
// Save to your persistence module
// (that the particular user wants updates from such-and-such products)
}
后端调度程序作业将在可用时发送更新:
@Scheduled(cron = "0 0 1 * * ?") // based on your business logic
public void scheduleTaskUsingCronExpression() {
// loop through user_product_updates table and construct "data"
// username is from your spring security username (principal.getName())
template.convertAndSendToUser(username, "/queue/products", data);
}
我是 spring websocket 的新手。我想将产品更改发送给客户。为此,我想按如下方式进行: 客户端创建套接字连接并订阅目的地:
var socket = new SockJS('/websocket');
var stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
stompClient.subscribe('/product/changes', function (scoredata) {
// We received product changes
});
});
//Send Ajax request and say server I want to know product with id=5 changes.
sendAjaxRequest(5);
我已将 spring 应用配置如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/product/");
registry.setApplicationDestinationPrefixes("/app");
}
}
现在我需要以下方法:
@RestController
public class ProductController {
@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
// register current websocket session with product id and
// then with convertAndSendToUser send changes to current user.
}
}
我该如何实施?
Spring documentation 是学习网络套接字概念的良好起点。 要发送给客户端,您可以使用 SimpMessageSendingOperations.
@Autowired
private SimpMessageSendingOperations messageSendingOperations;
从控制器方法,消息可以通过如下方式发送:
messageSendingOperations.convertAndSendToUser(websocketUserId, "/product/changes", messageObject);
我的第一个问题是,当您成功将 websockets 与 stomp 集成后,为什么还要尝试向 rest 控制器发送 http 请求? 如果我对你的用例理解正确,我能想到的atm应该有三种解决方案。
解决方案 1(套接字会话 ↔ 产品 ID)
您可以通过打开的 websocket 连接将您的请求直接从您的客户端发送到服务器。 Spring 然后可以确定哪个 Websocket 会话进行了调用,您可以实现您的业务逻辑。您需要激活另一个名为“/queue”的代理,并指定订阅不用于广播时所需的用户目标前缀。在客户端,您还必须更改您的订阅路径。最后,您必须创建一个用 @Controller 注释的 class,其中包含您的消息映射以从连接的客户端接收消息。
服务器配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue", "/product"); // <- added "/queue"
registry.setApplicationDestinationPrefixes("/app");
registry.setUserDestinationPrefix("/user");
}
}
服务器控制器
@Controller
public class WebSocketContoller{
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
@MessageMapping("/product/register")
public void register(@Payload Long productId, @Header("simpSessionId") String sessionId) {
// register current websocket session with product id and
// then with convertAndSendToUser send changes to current user.
// Example of how to send a message to the user using the sessionId
String response = "This could also be one of your product objects of type Product";
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);
messagingTemplate.convertAndSendToUser(sessionId,"/queue/product/changes", response, headerAccessor.getMessageHeaders());
}
}
客户端订阅更改
stompClient.subscribe('/user/queue/product/changes', function (scoredata) {
// We received product changes
});
有关详细信息,您还可以查看此答案:
解决方案 2(主体 ↔ 产品 ID)
然而,如果你真的想考虑使用rest controller来开始注册你的进程,或者它只是不符合你的要求,你应该看看下面的link。 Spring 还能够通过公开的 SimpUserRegistry bean 跟踪活动的 websocket 会话及其用户。但是,您需要根据应用程序的安全性为客户端输入通道配置自定义 ChannelInterceptor 适配器,以确定用户。
查看此答案以获取详细信息和代码示例:
解决方案 3(产品 ID 主题)
您还可以订阅特定的产品 ID 主题,这样您甚至不需要知道哪个用户希望收到有关特定产品更改的通知。
客户端订阅更改
//e.g if you want to be notified about changes for products with id 5
stompClient.subscribe('/product/changes/5', function (scoredata) {
// We received product changes
});
服务器服务示例
@Service
public class WebSocketProductService{
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
// This would be the method which should inform your clients about specific product
// changes, instead of the String parameters a Product object should be used instead,
// you have to call this method yourself on product changes or schedule it or sth.
public void sendProductChange(String product, String productId) {
this.simpMessagingTemplate.convertAndSend("/product/changes/"+productId, product);
}
}
服务器控制器
如果您想管理产品 ID 订阅列表,则需要。就像解决方案 1 中解释的那样,您需要一个用 @Controller 注释的 class,它包含一个用 @SubscribeMapping 注释的方法。如果客户端尝试订阅指定路径,则会调用此方法。
@Controller
public class WebSocketContoller{
@SubscribeMapping("/product/changes/{productId}")
public void productIdSubscription(@DestinationVariable Long productId) {
//Manage your product id subscription list e.g.
}
}
如果您只想在用户请求时向用户发送产品更新,那么您可以使用普通的 HTTP 请求。但我知道您想根据特定于用户的业务逻辑推送通知。您还必须实施 Spring Security
来验证您的用户。
解决方案
我建议使用 user_product_updates( user_id, product_id)
table 在您的后端添加此业务逻辑 - 每行对应一个 product_id
,user_id
的用户想要订阅到更新:
@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
// Save this custom setting into your models
}
现在您可以运行一个预定的后端作业(可以是一个cron作业,基于您的推送通知) 向您的用户发送更新:
@Autowired
org.springframework.messaging.simp.SimpMessagingTemplate simpMessagingTemplate;
@Scheduled(cron = "0 0 1 * * ?") // based on your business logic (say daily at 12:01 am)
public void scheduleTaskUsingCronExpression() {
// loop through user_product_updates table and construct "data"
// username is from your spring security username (principal.getName())
simpMessagingTemplate.convertAndSendToUser(username, "/queue/products", data);
}
展望未来,您可能需要添加一些缓存来优化它们(尤其是从 product_id
获取 产品信息),以便 运行 顺利进行。
总结
您的网络套接字配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app")
.setUserDestinationPrefix("/user")
.enableSimpleBroker("/topic", "/queue", "/product");
}
}
前端应用程序中的侦听器可能如下所示:
that.stompClient.subscribe("/user/queue/products", (message) => {
if (message.body) {
// We received product changes
}
});
用户将注册产品更新:
@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
// Save to your persistence module
// (that the particular user wants updates from such-and-such products)
}
后端调度程序作业将在可用时发送更新:
@Scheduled(cron = "0 0 1 * * ?") // based on your business logic
public void scheduleTaskUsingCronExpression() {
// loop through user_product_updates table and construct "data"
// username is from your spring security username (principal.getName())
template.convertAndSendToUser(username, "/queue/products", data);
}