当数据库中的某些内容被修改时,仅通过 WebSockets 通知特定用户

Notify only specific user(s) through WebSockets, when something is modified in the database

为了通过 WebSockets 通知所有用户,当在选定的 JPA 实体中修改某些内容时,我使用以下基本方法。

@ServerEndpoint("/Push")
public class Push {

    private static final Set<Session> sessions = new LinkedHashSet<Session>();

    @OnOpen
    public void onOpen(Session session) {
        sessions.add(session);
    }

    @OnClose
    public void onClose(Session session) {
        sessions.remove(session);
    }

    private static JsonObject createJsonMessage(String message) {
        return JsonProvider.provider().createObjectBuilder().add("jsonMessage", message).build();
    }

    public static void sendAll(String text) {
        synchronized (sessions) {
            String message = createJsonMessage(text).toString();

            for (Session session : sessions) {
                if (session.isOpen()) {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    }
}

修改选定的 JPA 实体时,将引发相应的 CDI 事件,该事件将由以下 CDI 观察者观察。

@Typed
public final class EventObserver {

    private EventObserver() {}

    public void onEntityChange(@Observes EntityChangeEvent event) {
        Push.sendAll("updateModel");
    }
}

observer/consumer 调用 WebSockets 端点中定义的静态方法 Push#sendAll(),该方法将 JSON 消息作为通知发送给所有关联的 users/connections.

当只通知选定的用户时,sendAll() 方法中的逻辑需要以某种方式修改。

建立初始握手后,HttpSession可以访问this答案中所述,但仍然不足以通过两颗子弹完成上述任务。由于它在第一次握手请求时可用,因此之后为该会话设置的任何属性在服务器端点将不可用,换句话说,在建立握手后设置的任何会话属性将不可用。

如上所述,仅通知选定用户的最acceptable/canonical 方式是什么? sendAll() 方法中的某些条件语句或某处的其他内容是必需的。看来它必须做一些事情,而不仅仅是用户的 HttpSession.

我使用 GlassFish Server 4.1 / Java EE 7.

会话?

Since it is available when the first handshake request is made, any attribute set to that session afterwards will not be available in the server endpoint i.e. in other words, any session attribute set after a handshake is established will not be available

看来你是被“session”这个词的歧义给咬伤了。会话的生命周期取决于上下文和客户端。 Websocket (WS) 会话的生命周期与 HTTP 会话不同。就像 EJB 会话的生命周期与 HTTP 会话的生命周期不同一样。就像一个遗留的 Hibernate 会话没有与 HTTP 会话相同的生命周期。等等。您可能已经了解的 HTTP 会话在此处进行了解释 How do servlets work? Instantiation, sessions, shared variables and multithreading. The EJB session is explained here JSF request scoped bean keeps recreating new Stateful session beans on every request?

WebSocket 生命周期

WS 会话绑定到 HTML 文档表示的上下文。客户端基本上就是JavaScript代码。当 JavaScript 执行 new WebSocket(url) 时,WS 会话开始。当 JavaScript 在 WebSocket 实例上显式调用 close() 函数时,或者当关联的 HTML 文档由于页面导航(单击 link/bookmark 或修改浏览器地址栏中的 URL),或刷新页面,或关闭浏览器 tab/window。请注意,您可以在完全相同的 DOM 中创建多个 WebSocket 实例,通常每个实例具有不同的 URL 路径或查询字符串参数。

每次 WS 会话开始时(即每次 JavaScript 执行 var ws = new WebSocket(url); 时),这将触发握手请求,您因此可以通过以下方式访问关联的 HTTP 会话Configurator class 正如您已经发现的:

public class ServletAwareConfigurator extends Configurator {

    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        config.getUserProperties().put("httpSession", httpSession);
    }

}

因此,这不会像您预期的那样在每个 HTTP 会话或 HTML 文档中只调用一次。每次创建 new WebSocket(url) 时都会调用此方法。

然后将调用 @ServerEndpoint annotated class will be created and its @OnOpen 注释方法的全新实例。如果您熟悉 JSF/CDI 托管 beans,只需将 class 视为 @ViewScoped 并将方法视为 @PostConstruct.

@ServerEndpoint(value="/push", configurator=ServletAwareConfigurator.class)
public class PushEndpoint {

    private Session session;
    private EndpointConfig config;

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        this.session = session;
        this.config = config;
    }

    @OnMessage
    public void onMessage(String message) {
        // ...
    }

    @OnError
    public void onError(Throwable exception) {
        // ...
    }

    @OnClose
    public void onClose(CloseReason reason) {
        // ...
    }

}

请注意,此 class 与例如不属于应用程序范围的 servlet。它基本上是 WS 会话范围的。所以每个新的 WS 会话都有自己的实例。这就是为什么您可以安全地将 SessionEndpointConfig 分配为实例变量的原因。根据 class 设计(例如抽象模板等),您可以在必要时添加回 Session 作为所有其他 onXxx 方法的第一个参数。这个也支持。

@OnMessage annotated method will be invoked when JavaScript does webSocket.send("some message"). The @OnClose annotated method will be called when the WS session is closed. The exact close reason can if necessary be determined by close reason codes as available by CloseReason.CloseCodes enum. The @OnError注解方法将在抛出异常时被调用,通常作为 WS 连接上的 IO 错误(管道损坏、连接重置等)。

按登录用户收集 WS 会话

回到你只通知特定用户的具体功能需求,经过上面的解释你应该明白你可以安全地依赖 modifyHandshake() 从关联的 HTTP 会话中提取登录用户,每个时间,前提是 用户登录后创建 new WebSocket(url)

public class UserAwareConfigurator extends Configurator {

    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        User user = (User) httpSession.getAttribute("user");
        config.getUserProperties().put("user", user);
    }

}

在带有 @ServerEndpoint(configurator=UserAwareConfigurator.class) 的 WS 端点 class 中,您可以在 @OnOpen 注释方法中获取它,如下所示:

@OnOpen
public void onOpen(Session session, EndpointConfig config) {
    User user = (User) config.getUserProperties().get("user");
    // ...
}

你应该在应用程序范围内收集它们。您可以在端点 class 的 static 字段中收集它们。或者,更好的是,如果 WS 端点中的 CDI 支持在您的环境中没有被破坏(在 WildFly 中工作,而不在 Tomcat+Weld 中工作,不确定 GlassFish),那么只需将它们收集在应用程序范围的 CDI 托管 bean 中,您在端点 class.

中依次 @Inject

User 实例不是 null 时(即当用户登录时),请记住一个用户可以有多个 WS 会话。因此,您基本上需要将它们收集在一个 Map<User, Set<Session>> 结构中,或者可能是一个更细粒度的映射,它通过用户 ID 或 group/role 来映射它们,毕竟这样可以更容易地找到特定用户。这完全取决于最终要求。这至少是一个使用应用程序范围的 CDI 托管 bean 的启动示例:

@ApplicationScoped
public class PushContext {

    private Map<User, Set<Session>> sessions;

    @PostConstruct
    public void init() {
        sessions = new ConcurrentHashMap<>();
    }

    void add(Session session, User user) {
        sessions.computeIfAbsent(user, v -> ConcurrentHashMap.newKeySet()).add(session);
    }

    void remove(Session session) {
        sessions.values().forEach(v -> v.removeIf(e -> e.equals(session)));
    }

}
@ServerEndpoint(value="/push", configurator=UserAwareConfigurator.class)
public class PushEndpoint {

    @Inject
    private PushContext pushContext;

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        User user = (User) config.getUserProperties().get("user");
        pushContext.add(session, user);
    }

    @OnClose
    public void onClose(Session session) {
        pushContext.remove(session);
    }

}

最后,您可以向 PushContext 中的特定用户发送消息:

public void send(Set<User> users, String message) {
    Set<Session> userSessions;

    synchronized(sessions) {
        userSessions = sessions.entrySet().stream()
            .filter(e -> users.contains(e.getKey()))
            .flatMap(e -> e.getValue().stream())
            .collect(Collectors.toSet());
    }

    for (Session userSession : userSessions) {
        if (userSession.isOpen()) {
            userSession.getAsyncRemote().sendText(message);
        }
    }
}

作为 CDI 托管 bean 的 PushContext 具有额外的优势,即它可以注入任何其他 CDI 托管 bean,从而更容易集成。

触发关联用户的 CDI 事件

在您的 EntityListener 中,您最有可能根据您之前的相关问题 Real time updates from database using JSF/Java EE 触发 CDI 事件,您已经拥有更改后的实体,因此您应该能够找到用户通过他们在模型中的关系与之关联。

Notify only the user who is responsible for modifying the entity in question (it may be an admin user or a registered user who can modify something only after their successful login)

@PostUpdate
public void onChange(Entity entity) {
    Set<User> editors = entity.getEditors();
    beanManager.fireEvent(new EntityChangeEvent(editors));
}

Notify only specific user/s (not all). "Specific" means for example, when a post is voted up on this site, only the post owner is notified (the post may be voted up by any other user with sufficient privileges).

@PostUpdate
public void onChange(Entity entity) {
    User owner = entity.getOwner();
    beanManager.fireEvent(new EntityChangeEvent(Collections.singleton(owner)));
}

然后在CDI事件观察器中传递出去:

public void onEntityChange(@Observes EntityChangeEvent event) {
    pushContext.send(event.getUsers(), "message");
}

另请参阅: