动态添加 websocket 处理程序时的竞争条件
Race condition when dynamically adding websocket handlers
我正在用 netty 编写一个 websocket,我的代码中似乎存在竞争条件:
我有一个通道初始化程序,它构建的管道包括:
ch.pipeline().addLast(new HttpServerCodec())
ch.pipeline().addLast(new HttpObjectAggregator(65536))
ch.pipeline().addLast(new MyServer())
而 MyServer 的工作方式如下:
- 如果它收到 websocket 升级请求,它会尝试对请求进行身份验证
- 如果失败,则 returns 错误请求
- 如果成功,它会尝试:
- 按照我的自定义逻辑处理程序添加 websocket 处理程序并完成握手并建立 webscoket 连接
- 使用以下代码完成:
awareLogger.debug(log"upgrading to websocket")(logContext)
ctx.pipeline()
.addLast(new WebSocketServerProtocolHandler(route, true))
.addLast(new WebSocketFrameAggregator(65536))
.addLast(new MyWebsocketLogic(logContext))
ctx.fireChannelRead(httpRequest)
val _ = awareLogger.debug(log"upgraded to websocket")(logContext)
它正在尝试 fireChannelRead(httpRequest)
希望 WebSocketServerProtocolHandler
拦截它并完成握手。
我的问题是 - httpRequest
有时似乎一直传播到 MyWebsocketLogic
处理程序并且无法建立连接和握手。
我做错了什么吗?这几乎就像我在代码中遇到了某种竞争条件。
问题是:
awareLogger.debug(log"upgrading to websocket")(logContext)
ctx.pipeline()
.addLast(new WebSocketServerProtocolHandler(route, true))
.addLast(new WebSocketFrameAggregator(65536))
.addLast(new MyWebsocketLogic(logContext))
ctx.fireChannelRead(httpRequest)
val _ = awareLogger.debug(log"upgraded to websocket")(logContext)
在与分配给给定 ctx
的线程不同的线程中被调用。
我能够通过应用上面 Norman 的建议来解决这个问题,即将管道修改切换到 channel
的 EventLoop
,意思是:
ctx.channel().eventLoop().execute { () =>
val _ = ctx
.pipeline()
.addLast(new WebSocketServerProtocolHandler(route, true))
.addLast(new WebSocketFrameAggregator(65536))
.addLast(buildWebsocketHandler(logContext, connectionHandler))
val _ = ctx.fireChannelRead(msg)
}
这似乎很有效
我正在用 netty 编写一个 websocket,我的代码中似乎存在竞争条件:
我有一个通道初始化程序,它构建的管道包括:
ch.pipeline().addLast(new HttpServerCodec())
ch.pipeline().addLast(new HttpObjectAggregator(65536))
ch.pipeline().addLast(new MyServer())
而 MyServer 的工作方式如下:
- 如果它收到 websocket 升级请求,它会尝试对请求进行身份验证
- 如果失败,则 returns 错误请求
- 如果成功,它会尝试:
- 按照我的自定义逻辑处理程序添加 websocket 处理程序并完成握手并建立 webscoket 连接
- 使用以下代码完成:
awareLogger.debug(log"upgrading to websocket")(logContext)
ctx.pipeline()
.addLast(new WebSocketServerProtocolHandler(route, true))
.addLast(new WebSocketFrameAggregator(65536))
.addLast(new MyWebsocketLogic(logContext))
ctx.fireChannelRead(httpRequest)
val _ = awareLogger.debug(log"upgraded to websocket")(logContext)
它正在尝试 fireChannelRead(httpRequest)
希望 WebSocketServerProtocolHandler
拦截它并完成握手。
我的问题是 - httpRequest
有时似乎一直传播到 MyWebsocketLogic
处理程序并且无法建立连接和握手。
我做错了什么吗?这几乎就像我在代码中遇到了某种竞争条件。
问题是:
awareLogger.debug(log"upgrading to websocket")(logContext)
ctx.pipeline()
.addLast(new WebSocketServerProtocolHandler(route, true))
.addLast(new WebSocketFrameAggregator(65536))
.addLast(new MyWebsocketLogic(logContext))
ctx.fireChannelRead(httpRequest)
val _ = awareLogger.debug(log"upgraded to websocket")(logContext)
在与分配给给定 ctx
的线程不同的线程中被调用。
我能够通过应用上面 Norman 的建议来解决这个问题,即将管道修改切换到 channel
的 EventLoop
,意思是:
ctx.channel().eventLoop().execute { () =>
val _ = ctx
.pipeline()
.addLast(new WebSocketServerProtocolHandler(route, true))
.addLast(new WebSocketFrameAggregator(65536))
.addLast(buildWebsocketHandler(logContext, connectionHandler))
val _ = ctx.fireChannelRead(msg)
}
这似乎很有效