Netty-ChannelRead 报告 Object msg 是 SimpleLeakAwareByteBuf 类型

Netty- ChannelRead reports that Object msg is of SimpleLeakAwareByteBuf Type

我正在对我的 Netty 套接字服务器进行 Curl post curl -X POST -d "dsds" 10.0.0.211:5201,但是在我的 ChannelRead 中,当我尝试将 Object msg 转换为 FullHttpRequest 时它抛出以下异常。

java.lang.ClassCastException: io.netty.buffer.SimpleLeakAwareByteBuf cannot be cast to io.netty.handler.codec.http.FullHttpRequest
        at edu.clemson.openflow.sos.host.netty.HostPacketHandler.channelRead(HostPacketHandler.java:42)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1320)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:905)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:563)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:504)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:418)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:390)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:742)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
        at java.lang.Thread.run(Thread.java:748)

以下是我的套接字处理程序class

@ChannelHandler.Sharable
public class HostPacketHandler extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(HostPacketHandler.class);
    private RequestParser request;

    public HostPacketHandler(RequestParser request) {
        this.request = request;
        log.info("Expecting Host at IP {} Port {}",
                request.getClientIP(), request.getClientPort());
    }

    public void setRequestObject(RequestParser requestObject) {
        this.request = requestObject;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // Discard the received data silently.
        InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        log.info("Got Message from {} at Port {}",
                socketAddress.getHostName(),
                socketAddress.getPort());
        //FullHttpRequest request = (FullHttpRequest) msg;
        log.info(msg.getClass().getSimpleName());
        //((ByteBuf) msg).release();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }

}

管道:

public class NettyHostSocketServer implements IClientSocketServer {
    protected static boolean isClientHandlerRunning = false;
    private static final Logger log = LoggerFactory.getLogger(SocketManager.class);
    private static final int CLIENT_DATA_PORT = 9877;
    private static final int MAX_CLIENTS = 5;
    private HostPacketHandler hostPacketHandler;

    public NettyHostSocketServer(RequestParser request) {
        hostPacketHandler = new HostPacketHandler(request);
    }

    private boolean startSocket(int port) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(
                                    hostPacketHandler);
                        }
                    });

            ChannelFuture f = b.bind().sync();
            log.info("Started host-side socket server at Port {}",CLIENT_DATA_PORT);
            return true;
            // Need to do socket closing handling. close all the remaining open sockets
            //System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
            //f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("Error starting host-side socket");
            e.printStackTrace();
            return false;
        } finally {
            //group.shutdownGracefully().sync();
        }
    }

    @Override
    public boolean start() {

        if (!isClientHandlerRunning) {
            isClientHandlerRunning = true;
            return startSocket(CLIENT_DATA_PORT);
        }
        return true;
    }


    @Override
    public int getActiveConnections() {
        return 0;
    }
}

我还使用 wireshark 来检查我是否收到有效数据包。下面是 Wireshark 转储的屏幕截图。

您的问题是您从未将 ByteBuf 解码为实际的 HttpRequest 对象,这就是您收到错误的原因。您不能将 ByteBuf 转换为 FullHttpRequest 对象。

你应该这样做:

@Override
public void initChannel(Channel channel) throws Exception {
    channel.pipeline().addLast(new HttpRequestDecoder()) // Decodes the ByteBuf into a HttpMessage and HttpContent (1)
        .addLast(new HttpObjectAggregator(1048576)) // Aggregates the HttpMessage with its following HttpContent into a FullHttpRequest
        .addLast(hostPacketHandler);
}

(1) 如果您还想发送 HttpResponse,请使用此处理程序 HttpServerCodec,它添加了 HttpRequestDecoder 和 HttpResponseEncoder。