为集群环境创建列表
Creating a list for the clustered environment
在我的应用程序中有一个列表 publisherPostListenerList
,它从 RabbitMQ 队列接收实时用户 posts 并发送到 subscribers/consumers。该列表是 ApplicationListener
class 的 属性,它监听 pubsub 队列的事件。下面的控制器方法通过 getter 方法获取列表元素,并根据逻辑将 post 推送给订阅者。
流程如下
用户写入 post -> Post 进入 DB + 队列 -> 来自队列的消息被添加到一个列表中,该列表 publisherPostListenerList
被推送给订阅者用户。
正如我们所见,publisherPostListenerList
是 n 个并发请求的公共列表,因为 ApplicationListener
是一个单例。对于单个实例,设置工作正常,但在集群环境中会失败,因为每个节点都有自己的 publisherPostListenerList
列表。
我该如何处理这种情况?我不能使 ApplicationListener
class 无状态 我需要列表来存储从队列接收到的 post 元素。我是否将列表放在分布式内存缓存中?或者有什么其他的常规方式?
ApplicationListener.java
@Component
public class ApplicationEventListener {
private List<Post> publisherPostListenerList = new CopyOnWriteArrayList<Post>();
private static final Logger logger = Logger.getLogger(ApplicationEventListener.class);
@EventListener
public void postSubmissionEventHandler(PostSubmissionEvent event) throws IOException {
Post post = event.getPost();
logger.debug("application published user post received " + post);
publisherPostListenerList.add(post);
}
public List<Post> getPublisherPostListenerList() {
return publisherPostListenerList;
}
public void setPublisherPostListenerList(List<Post> publisherPostListenerList) {
this.publisherPostListenerList = publisherPostListenerList;
}
}
向订阅者推送消息的控制器方法
@RequestMapping(value="/getRealTimeServerPushUserPosts")
public SseEmitter getRealTimeServerPushUserPosts(@RequestParam("userId") int userId){
SseEmitter sseEmitter = new SseEmitter();
CustomUserDetail myUserDetails = currentUserAccessor.getCurrentLoggedInUser();
User loggedInUser=myUserDetails.getUser();
List<Integer> userPublisherIDList = this.userService.loadUserPublisherIdListWhichLoggedInUserFollows(loggedInUser);
List<Post> postList =eventListener.getPublisherPostListenerList();
for(Integer userPublisherId : userPublisherIDList){
for(Post post:postList){
if((userPublisherId.intValue()) == (post.getUser().getUserId().intValue())){
try {
sseEmitter.send(post);
postList.remove(post); //removes the post for all the subscribers as the list acts as a global list.
} catch (IOException e) {
logger.error(e);
}
}
}
}
return sseEmitter;
}
您可以使用 Hazelcast IList
。它遵循 j.u.List
语义,适用于分布式/集群环境。
您可以找到文档 here and examples here。
另一种选择是使用分布式地图又名 IMap
.
如果您对实施细节有任何具体问题,请告诉我。
谢谢
ApplicationListener 旨在用于处理应用程序上下文中的事件。为了解决您的问题,您可能需要部署一些消息技术(JMS 主题)。
您的 postSubmissionEventHandler() 不会将 PostSubmmittion 添加到列表,而是创建并发送一条消息来表示 JMS 主题上的事件。
现在在您的控制器方法中,您可以从主题中读取消息,然后将它们发布给连接的用户。
希望对您有所帮助
将列表放入应用程序的缓存内存中可能会导致几个问题(例如低可伸缩性...)。为什么不使用像 Redis 这样的内存数据库呢?通过这种方式,您可以扩展您的应用程序,并且所有实例都可以共享同一个数据库。您还保证数据的完整性。
在我的应用程序中有一个列表 publisherPostListenerList
,它从 RabbitMQ 队列接收实时用户 posts 并发送到 subscribers/consumers。该列表是 ApplicationListener
class 的 属性,它监听 pubsub 队列的事件。下面的控制器方法通过 getter 方法获取列表元素,并根据逻辑将 post 推送给订阅者。
流程如下
用户写入 post -> Post 进入 DB + 队列 -> 来自队列的消息被添加到一个列表中,该列表 publisherPostListenerList
被推送给订阅者用户。
正如我们所见,publisherPostListenerList
是 n 个并发请求的公共列表,因为 ApplicationListener
是一个单例。对于单个实例,设置工作正常,但在集群环境中会失败,因为每个节点都有自己的 publisherPostListenerList
列表。
我该如何处理这种情况?我不能使 ApplicationListener
class 无状态 我需要列表来存储从队列接收到的 post 元素。我是否将列表放在分布式内存缓存中?或者有什么其他的常规方式?
ApplicationListener.java
@Component
public class ApplicationEventListener {
private List<Post> publisherPostListenerList = new CopyOnWriteArrayList<Post>();
private static final Logger logger = Logger.getLogger(ApplicationEventListener.class);
@EventListener
public void postSubmissionEventHandler(PostSubmissionEvent event) throws IOException {
Post post = event.getPost();
logger.debug("application published user post received " + post);
publisherPostListenerList.add(post);
}
public List<Post> getPublisherPostListenerList() {
return publisherPostListenerList;
}
public void setPublisherPostListenerList(List<Post> publisherPostListenerList) {
this.publisherPostListenerList = publisherPostListenerList;
}
}
向订阅者推送消息的控制器方法
@RequestMapping(value="/getRealTimeServerPushUserPosts")
public SseEmitter getRealTimeServerPushUserPosts(@RequestParam("userId") int userId){
SseEmitter sseEmitter = new SseEmitter();
CustomUserDetail myUserDetails = currentUserAccessor.getCurrentLoggedInUser();
User loggedInUser=myUserDetails.getUser();
List<Integer> userPublisherIDList = this.userService.loadUserPublisherIdListWhichLoggedInUserFollows(loggedInUser);
List<Post> postList =eventListener.getPublisherPostListenerList();
for(Integer userPublisherId : userPublisherIDList){
for(Post post:postList){
if((userPublisherId.intValue()) == (post.getUser().getUserId().intValue())){
try {
sseEmitter.send(post);
postList.remove(post); //removes the post for all the subscribers as the list acts as a global list.
} catch (IOException e) {
logger.error(e);
}
}
}
}
return sseEmitter;
}
您可以使用 Hazelcast IList
。它遵循 j.u.List
语义,适用于分布式/集群环境。
您可以找到文档 here and examples here。
另一种选择是使用分布式地图又名 IMap
.
如果您对实施细节有任何具体问题,请告诉我。
谢谢
ApplicationListener 旨在用于处理应用程序上下文中的事件。为了解决您的问题,您可能需要部署一些消息技术(JMS 主题)。 您的 postSubmissionEventHandler() 不会将 PostSubmmittion 添加到列表,而是创建并发送一条消息来表示 JMS 主题上的事件。 现在在您的控制器方法中,您可以从主题中读取消息,然后将它们发布给连接的用户。
希望对您有所帮助
将列表放入应用程序的缓存内存中可能会导致几个问题(例如低可伸缩性...)。为什么不使用像 Redis 这样的内存数据库呢?通过这种方式,您可以扩展您的应用程序,并且所有实例都可以共享同一个数据库。您还保证数据的完整性。