动态添加 websocket 处理程序时的竞争条件

Race condition when dynamically adding websocket handlers

我正在用 netty 编写一个 websocket,我的代码中似乎存在竞争条件:

我有一个通道初始化程序,它构建的管道包括:

  ch.pipeline().addLast(new HttpServerCodec())
  ch.pipeline().addLast(new HttpObjectAggregator(65536))
  ch.pipeline().addLast(new MyServer())

而 MyServer 的工作方式如下:

      awareLogger.debug(log"upgrading to websocket")(logContext)
      ctx.pipeline()
         .addLast(new WebSocketServerProtocolHandler(route, true))
         .addLast(new WebSocketFrameAggregator(65536))
         .addLast(new MyWebsocketLogic(logContext))
      ctx.fireChannelRead(httpRequest)
      val _ = awareLogger.debug(log"upgraded to websocket")(logContext)

它正在尝试 fireChannelRead(httpRequest) 希望 WebSocketServerProtocolHandler 拦截它并完成握手。

我的问题是 - httpRequest 有时似乎一直传播到 MyWebsocketLogic 处理程序并且无法建立连接和握手。

我做错了什么吗?这几乎就像我在代码中遇到了某种竞争条件。

问题是:

      awareLogger.debug(log"upgrading to websocket")(logContext)
      ctx.pipeline()
         .addLast(new WebSocketServerProtocolHandler(route, true))
         .addLast(new WebSocketFrameAggregator(65536))
         .addLast(new MyWebsocketLogic(logContext))
      ctx.fireChannelRead(httpRequest)
      val _ = awareLogger.debug(log"upgraded to websocket")(logContext)

在与分配给给定 ctx 的线程不同的线程中被调用。

我能够通过应用上面 Norman 的建议来解决这个问题,即将管道修改切换到 channelEventLoop,意思是:

      ctx.channel().eventLoop().execute { () =>
        val _ = ctx
          .pipeline()
          .addLast(new WebSocketServerProtocolHandler(route, true))
          .addLast(new WebSocketFrameAggregator(65536))
          .addLast(buildWebsocketHandler(logContext, connectionHandler))
        val _ = ctx.fireChannelRead(msg)
      }

这似乎很有效