如何将 FixedChannelPool 与 WebSockets 一起使用?

How to use FixedChannelPool with WebSockets?

各位开发者大家好...

我正在尝试实现 WebSocket 固定连接池,不幸的是 netty 为 FixedChannelPool 提供了糟糕的指导,最大 "what you can get" 是他们的 Unit Tests

我正在使用的客户端代码示例Source Code

并从 中发现 bootstrap.handler(new ChannelInitializer<>())ChannelPool

覆盖

然后我尝试将 ChannelInitializer 块移动到:

public class SomeConnectionPoolHandler implements ChannelPoolHandler {

    private final URI uri = URI.create("ws://some.url:80");

    @Override
    public void channelCreated(Channel ch) {
        String protocol = uri.getScheme();
        if (!"ws".equals(protocol)) {
            throw new IllegalArgumentException("Unsupported protocol: " + protocol);
        }

        DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
        final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpClientCodec(),  new HttpObjectAggregator(65536), handler);
    }

    @Override
    public void channelReleased(Channel ch) {
        //TODO
    }

    @Override
    public void channelAcquired(Channel ch) {
        //TODO
    }
}

现在我的客户端代码如下所示:

public class Application {
    private final URI uri = URI.create("ws://some.url:80");
    public static void main(String[] args) {
        Channel ch = null;
        try {
            String protocol = uri.getScheme();
            if (!"ws".equals(protocol)) {
                throw new IllegalArgumentException("Unsupported protocol: " + protocol);
            }
            Bootstrap b = new Bootstrap();
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.remoteAddress(uri.getHost(), uri.getPort());

            b.group(group);
            b.channel(NioSocketChannel.class);
            SomeConnectionPoolHandler connectionPoolHandler = new SomeConnectionPoolHandler ();

            channelPool = new FixedChannelPool(b, connectionPoolHandler, 8);
            ch = channelPool.acquire().sync().get();
            ch.writeAndFlush(new TextWebSocketFrame("test")).sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(ch != null) {
                channelPool.release(ch);
            }
        }
    }
}

现在我收到错误:

java.lang.UnsupportedOperationException: unsupported message type: TextWebSocketFrame (expected: ByteBuf, FileRegion)
    at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:266)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:799)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1291)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.write(CombinedChannelDuplexHandler.java:528)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:101)
    at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
    at io.netty.channel.AbstractChannelHandlerContext.access00(AbstractChannelHandlerContext.java:38)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)

好的,我找到了解决问题的方法。

问题是我试图在 WebSocket 握手完成之前将数据写入 Channel

handler source code中你可以看到它有获取WebSocket握手ChannelFuture的方法。

public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

现在,在从FixedChannelPool获取Channel之后,我正在调用方法(returns无论握手是否完成):

private boolean isConnectionReady(Channel ch) {
        try {
            WebSocketClientHandler handler = (WebSocketClientHandler) ch.pipeline().get("handler");
            return handler.handshakeFuture().sync().isSuccess();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

还在 channelCreated(Channel ch)

中添加了 WebSocketClientHandler 的名称
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(),  new HttpObjectAggregator(65536));
pipeline.addLast("handler", handler);

我知道这段代码现在看起来很糟糕,应该重构。