synchronized() 代码上的 ConcurrentModificationException - 这怎么可能?

ConcurrentModificationException on synchronized() code - how is it possible?

我有一个下面的 spring bean(稍作编辑)这个 bean 负责 adding/removing websocket 连接到内部连接 ​​(this.sockets)。新连接可能 added/removed 并且每次需要发送消息时都会迭代。

如您所见,sockets 字段上的每个操作都包含在一个 synchronized() 块中,它是私有的,不会在此 class 之外使用,每次使用此字段在我下面粘贴的代码中。

但我仍然不时收到 ConcurrentModificartionException 发生在 onApplicationEvent 调用的 for 循环中。这个调用可能确实被应用程序中的多个线程调用,但我的理解是,如果每个实际修改或迭代 this.socket 变量的调用都包装在同一个对象上的 synchronized() 块中,那么这些调用中的每一个都不能并行调用,必须相互“等待”。

如何在这段代码中抛出 ConcurrentModificationException?请记住,我正在尝试了解出现异常的原因,我并不是在寻找可以解决此问题的快速解决方案(因此 'why dont you use XXX here' 之类的答案实际上并没有为我解决任何问题 - 我我想找出我在代码中做错了什么)。

代码:

public class MyClass extends TextWebSocketHandler implements ApplicationListener<AppEventMessage>{

    private static final Logger log= LoggerFactory.getLogger(MyClass.class);

    private Set<WebSocketSession> sockets=new HashSet<>();


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        super.afterConnectionEstablished(session);
        synchronized (this.sockets) {
            this.sockets.add(session);
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        synchronized (this.sockets) {
            this.sockets.remove(session);
        }
    }

    @Override
    public void onApplicationEvent(AppEventMessage event) {
        synchronized (this.sockets) {
            for (WebSocketSession session : this.sockets) {
                try {
                    log.info("sendin");
                    session.sendMessage(new TextMessage(event.getMessage()));
                } catch (Throwable e) {
                    log.warn("error");
                }
            }
        }
    }
}

stacktrace 最相关的部分(加上我的评论)

java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
    at java.util.HashMap$KeyIterator.next(HashMap.java:1469)
    at ...MyClass.onApplicationEvent(ApplicationEventWebsocketHandler.java:46) <- (line 46 is the for(WebSocketSession ...) line)
    at ...MyClass.onApplicationEvent(ApplicationEventWebsocketHandler.java:19) <- (line 19 is the "public class Myclass" line - how it is possible?)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:163)

我没有立即看出是什么导致了 ConcurrentModificationException,但这里有一个可能有用的建议。而不是同步 this.sockets 在你的 class 中声明一个
static final Object myLock = new Object();
并同步。该对象将是唯一的且不可修改的。更好的候选人是锁。更好的是,阅读 classes Semaphore and Lock。它们提供更好的同步并且更易于维护

到目前为止同意你的诊断。还有第二种方法可以获得通常被忽视的 ConcurrentModificationException...

那些synchronized锁是re-entrant这意味着在一个方法中持有锁的线程可以访问所有方法。因此,导致您的 CME 的线程很可能是 for 循环 中 的回调,最终回到您的 connection-listener 方法中。

IMO,最有可能的情况是连接仅在尝试 sendMessage 时被检测为已关闭。

解决方案:有更聪明的解决方案,但这是一个不会出错的解决方案:

  // Take copy of sockets before iterating it to defend against self-induced ConcurrentModificationException
  for (WebSocketSession session : new HashSet<>(this.sockets)) {