从不同 InboundChannelHandlerAdapter.channelRead 调用 writeAndFlush 时的 netty 问题

netty issue when writeAndFlush called from different InboundChannelHandlerAdapter.channelRead

我遇到了一个问题,出于安全原因,我无法 post 完整代码(抱歉)。我的问题的要点是我有一个 ServerBootstrap,创建如下:

bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
final ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)                        
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(0, 0, 3000));
                                //Adds the MQTT encoder and decoder
                                ch.pipeline().addLast("decoder", new MyMessageDecoder());
                                ch.pipeline().addLast("encoder", new MyMessageEncoder());
                                ch.pipeline().addLast(createMyHandler());
                            }
                        }).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_REUSEADDR, true)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // Bind and start to accept incoming connections.
                channelFuture = b.bind(listenAddress, listenPort);

使用 createMyHandlerMethod() 基本上 returns ChannelInboundHandlerAdapter

的扩展实现

我还有一个 "client" 侦听器,用于侦听传入的连接请求,加载方式如下:

    final String host = getHost();
        final int port = getPort();
        nioEventLoopGroup = new NioEventLoopGroup();

        bootStrap = new Bootstrap();
        bootStrap.group(nioEventLoopGroup);
        bootStrap.channel(NioSocketChannel.class);
        bootStrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootStrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(0, 0, getKeepAliveInterval()));
                ch.pipeline().addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimeoutHandler());
                ch.pipeline().addLast("decoder", new MyMessageDecoder());
                ch.pipeline().addLast("encoder", new MyMessageEncoder());
                ch.pipeline().addLast(MyClientHandler.this);
            }
        })
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.TCP_NODELAY, true);

        // Start the client.
        try {
            channelFuture = bootStrap.connect(host, port).sync();
        } catch (InterruptedException e) {
            throw new MyException(“Exception”, e);
        }

其中 MyClientHandler 又是 ChannelInboundHandlerAdapter 的子类实例。一切正常,我收到来自 "server" 适配器的消息,我处理它们,然后在相同的上下文中将它们发回。 "client" 处理程序反之亦然。

当我必须(对于某些消息)将它们从服务器或客户端处理程序代理到其他连接时,就会出现问题。再次,我很抱歉不能 post 很多代码,但它的要点是我打电话给:

serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
     if (msg instanceof myProxyingMessage) {
        if (ctx.channel().isActive()) {
             ctx.channel().writeAndFlush(someOtherMessage);
             **getClientHandler().writeAndFlush(myProxyingMessage);**
        }
     }
}

现在问题来了:加粗的(客户端)writeAndFlush - 从未实际写入消息字节,它不会抛出任何错误。 ChannelFuture returns 全部为 false(成功、取消、完成)。如果我同步它,最终它会因其他原因超时(我的代码中设置了连接超时)。

我知道我还没有 post 编辑我的所有代码,但我希望有人有一些提示 and/or 如何隔离为什么它不写的问题到客户端上下文。无论如何,我都不是 Netty 专家,而且大部分代码都是别人写的。它们都是 ChannelInboundHandlerAdapter

的子类

如有任何问题,欢迎随时提出。

*****编辑********* 我尝试使用以下测试代码将请求代理回不同的 context/channel(即客户端通道):

public void proxyPubRec(int messageId) throws MQTTException {
        logger.log(logLevel, "proxying PUBREC to context:  " + debugContext());
        PubRecMessage pubRecMessage = new PubRecMessage();
        pubRecMessage.setMessageID(messageId);
        pubRecMessage.setRemainingLength(2);
        logger.log(logLevel, "pipeline writable flag:  " + ctx.pipeline().channel().isWritable());
        MyMQTTEncoder encoder = new MyMQTTEncoder();

        ByteBuf buff = null;
        try {
            buff = encoder.encode(pubRecMessage);
            ctx.channel().writeAndFlush(buff);

        } catch (Throwable t) {
            logger.log(Level.SEVERE, "unable to encode PUBREC");
        } finally {
            if (buff != null) {
                buff.release();
            }
        }
    }

    public class MyMQTTEncoder extends MQTTEncoder {
        public ByteBuf encode(AbstractMessage msg) {
            PooledByteBufAllocator allocator = new PooledByteBufAllocator();
            ByteBuf buf = allocator.buffer();
            try {
                super.encode(ctx, msg, buf);
            } catch (Throwable t) {
                logger.log(Level.SEVERE, "unable to encode PUBREC, " + t.getMessage());
            }
            return buf;
        }
    }

但上面的行:ctx.channel().writeAndFlush(buff) 没有写入其他通道 - 任何 tips/tricks 调试此类问题?

someOtherMessage 必须是 ByteBuf.

所以,拿这个:

serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
     if (msg instanceof myProxyingMessage) {
        if (ctx.channel().isActive()) {
             ctx.channel().writeAndFlush(someOtherMessage);
             **getClientHandler().writeAndFlush(myProxyingMessage);**
        }
     }
}

... 并将其替换为:

serverHandler.channelRead(ChannelHandlerContext ctx, Object msg) {
     if (msg instanceof myProxyingMessage) {
        if (ctx.channel().isActive()) {
             ctx.channel().writeAndFlush(ByteBuf);
             **getClientHandler().writeAndFlush(myProxyingMessage);**
        }
     }
}

实际上,这是一个线程问题。我的一个线程是 blocked/waiting,而其他线程正在写入上下文,因此,写入被缓冲而不发送,即使刷新也是如此。问题解决了!

本质上,我将第一个消息代码放在一个 Runnable/Executor 线程中,这允许它单独 运行 以便第二个 write/response 能够写入上下文。这仍然可能存在一些问题(在消息排序方面),但这不是原始问题的主题。感谢您的帮助!