如何正确使用 VAVR 集合是线程安全的?
How to correctly use VAVR collections to be thread safe?
VAVR 合集是 "immutable"。
那么,如果我有静态变量,例如,保存所有 WebSocket 会话,我将如何使用 VAVR 以使集合是线程安全的?
例如:
@ServerEndpoint("/actions")
public class DeviceWebSocketServer {
private static Set<Session> sessions = //???; // how should I initialize this?
@OnOpen
public void open(Session session) {
sessions = sessions.add(session); // is this OK???
}
@OnClose
public void close(Session session) {
sessions = sessions.remove(session); // is this OK??
}
}
您可以将不可变的 vavr 集合包装在可原子更新的 AtomicReference
中,并使用其中一种更新方法以原子方式更新对不可变集合的引用。
@ServerEndpoint("/actions")
public class DeviceWebSocketServer {
private static AtomicReference<Set<Session>> sessionsRef =
new AtomicReference<>(HashSet.empty());
@OnOpen
public void open(Session session) {
sessionsRef.updateAndGet(sessions -> sessions.add(session));
}
@OnClose
public void close(Session session) {
sessionsRef.updateAndGet(sessions -> sessions.remove(session));
}
}
如果您要在其他场景中使用它们,请务必阅读 AtomicReference
的 javadoc,因为需要遵守更新函数的一些要求以获得正确的行为。
对于此用例,您还可以考虑使用并发映射:
@ServerEndpoint("/actions")
public class DeviceWebSocketServer {
private static ConcurrentMap<String, Session> sessions = new ConcurrentHashMap<>();
@OnOpen
public void open(Session session) {
sessions = sessions.putIfAbsent(session.getId(), session);
}
@OnClose
public void close(Session session) {
sessions = sessions.remove(session.getId());
}
}
VAVR 合集是 "immutable"。
那么,如果我有静态变量,例如,保存所有 WebSocket 会话,我将如何使用 VAVR 以使集合是线程安全的?
例如:
@ServerEndpoint("/actions")
public class DeviceWebSocketServer {
private static Set<Session> sessions = //???; // how should I initialize this?
@OnOpen
public void open(Session session) {
sessions = sessions.add(session); // is this OK???
}
@OnClose
public void close(Session session) {
sessions = sessions.remove(session); // is this OK??
}
}
您可以将不可变的 vavr 集合包装在可原子更新的 AtomicReference
中,并使用其中一种更新方法以原子方式更新对不可变集合的引用。
@ServerEndpoint("/actions")
public class DeviceWebSocketServer {
private static AtomicReference<Set<Session>> sessionsRef =
new AtomicReference<>(HashSet.empty());
@OnOpen
public void open(Session session) {
sessionsRef.updateAndGet(sessions -> sessions.add(session));
}
@OnClose
public void close(Session session) {
sessionsRef.updateAndGet(sessions -> sessions.remove(session));
}
}
如果您要在其他场景中使用它们,请务必阅读 AtomicReference
的 javadoc,因为需要遵守更新函数的一些要求以获得正确的行为。
对于此用例,您还可以考虑使用并发映射:
@ServerEndpoint("/actions")
public class DeviceWebSocketServer {
private static ConcurrentMap<String, Session> sessions = new ConcurrentHashMap<>();
@OnOpen
public void open(Session session) {
sessions = sessions.putIfAbsent(session.getId(), session);
}
@OnClose
public void close(Session session) {
sessions = sessions.remove(session.getId());
}
}