为什么 Netty HTTP 处理程序不可共享?

Why aren't the Netty HTTP handlers sharable?

Netty实例化了一组请求处理器类whenever a new connection is opened。这对于像 websocket 这样的东西来说似乎很好,在 websocket 的生命周期中连接将保持打开状态。

当使用 Netty 作为每秒可以接收数千个请求的 HTTP 服务器时,这似乎对垃圾收集造成了很大的负担。每个请求都会实例化几个 类(在我的例子中是 10 个处理程序 类),然后垃圾会在几毫秒后收集它们。

在中等负载 ~1000 req/sec 的 HTTP 服务器中,这将是 类 来实例化和垃圾收集 每秒.

看来我们可以简单地 查看下面的答案 创建可共享的处理程序,使用 ChannelHandler.Sharable 来消除这种大的 GC 开销。它们只需要是线程安全的。

但是,我看到库中打包的所有非常基本的 HTTP 处理程序 不可 共享,例如 HttpServerCodecHttpObjectAggregator。此外,HTTP handler examples are sharable. 99% of example code and tutorials don't seem to bother with it. There was only one blurb in Norman Maurer's book(Netty 作者)的 none 给出了使用共享处理程序的原因:

WHY SHARE A CHANNELHANDLER?

A common reason for installing a single ChannelHandler in multiple ChannelPipelines is to gather statistics across multiple Channels.

任何地方都没有提及 GC 负载问题。


近十年来,Netty 一直在常规生产中使用。对于高度并发的非阻塞 IO,它可以说是最常用的 java 库。

换句话说,它的设计目的是为了做比我每秒 1000 次适中的请求更多的事情。

有什么我遗漏的东西使 GC 加载不是问题吗?

或者,我是否应该尝试实现自己的 Sharable 具有类似解码、编码和写入 HTTP 请求和响应功能的处理程序?

虽然我们始终致力于在 netty 中产生尽可能少的 GC,但在某些情况下这是不可能的。例如,http 编解码器等保持每个连接的状态,因此无法共享这些状态(即使它们是线程安全的)。

解决这个问题的唯一方法是将它们合并,但我认为还有其他对象更可能导致 GC 问题,对于这些对象,我们会尽可能轻松地合并。

TL;DR:

如果您达到了使 GC 成为默认 HTTP 处理程序问题所需的数量,那么无论如何都是使用代理服务器进行扩展的时候了。


在 Norman 的回答之后,我最终尝试了一个非常简单的可共享 HTTP codec/aggregator POC,看看这是否值得追求。

我的可共享解码器与 RFC 7230 相去甚远,但它满足了我当前项目的需求。

然后我使用 httperf and visualvm 了解了 GC 负载差异的概念。通过我的努力,我的 GC 率只降低了 10%。换句话说,它真的没有太大的区别。

唯一真正值得赞赏的效果是当 运行 1000 req/sec 与使用打包的非共享 HTTP 编解码器 + 聚合器和我的可共享编解码器相比,我的错误减少了 5%。这只发生在我做 1000 req/sec 持续时间超过 10 秒时。

最后不追究了。将其变成一个完全兼容 HTTP 的解码器所需的时间是为了获得可以通过使用代理服务器解决的微小好处而花费的时间根本不值得。

出于参考目的,这里是我尝试过的组合可共享 decoder/aggregator:

import java.util.concurrent.ConcurrentHashMap;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;

@Sharable
public class SharableHttpDecoder extends ChannelInboundHandlerAdapter {

    private static final ConcurrentHashMap<ChannelId, SharableHttpRequest> MAP = 
            new ConcurrentHashMap<ChannelId, SharableHttpRequest>();
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) 
        throws Exception 
    {        
        if (msg instanceof ByteBuf) 
        {
            ByteBuf buf = (ByteBuf) msg;
            ChannelId channelId = ctx.channel().id();
            SharableHttpRequest request = MAP.get(channelId);
                                    
            if (request == null)
            {
                request = new SharableHttpRequest(buf);
                buf.release();
                if (request.isComplete()) 
                {
                    ctx.fireChannelRead(request);
                }
                else
                {
                    MAP.put(channelId, request);
                }
            }
            else
            {
                request.append(buf);
                buf.release();
                if (request.isComplete()) 
                {
                    ctx.fireChannelRead(request);
                }
            }
        }
        else
        {
            // TODO send 501
            System.out.println("WTF is this? " + msg.getClass().getName());
            ctx.fireChannelRead(msg);
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
        throws Exception 
    {
        System.out.println("Unable to handle request on channel: " + 
            ctx.channel().id().asLongText());
        cause.printStackTrace(System.err);
        
        // TODO send 500
        ctx.fireExceptionCaught(cause);
        ctx.close();
    }
    
}

解码器创建的用于在管道上处理的结果对象:

import java.util.Arrays;
import java.util.HashMap;
import io.netty.buffer.ByteBuf;

public class SharableHttpRequest
{
    
    private static final byte SPACE = 32;
    private static final byte COLON = 58;
    private static final byte CARRAIGE_RETURN = 13;
    
    private HashMap<Header,String> myHeaders;
    private Method myMethod;
    private String myPath;
    private byte[] myBody;
    private int myIndex = 0;
    
    public SharableHttpRequest(ByteBuf buf)
    {
        try
        {
            myHeaders = new HashMap<Header,String>();
            final StringBuilder builder = new StringBuilder(8);
            parseRequestLine(buf, builder);
            while (parseNextHeader(buf, builder));
            parseBody(buf);
        }
        catch (Exception e)
        {
            e.printStackTrace(System.err);
        }
    }
    
    public String getHeader(Header name)
    {
        return myHeaders.get(name);
    }
    
    public Method getMethod()
    {
        return myMethod;
    }
    
    public String getPath()
    {
        return myPath;
    }
    
    public byte[] getBody()
    {
        return myBody;
    }
    
    public boolean isComplete()
    {
        return myIndex >= myBody.length;
    }
    
    public void append(ByteBuf buf)
    {
        int length = buf.readableBytes();
        buf.getBytes(buf.readerIndex(), myBody, myIndex, length);
        myIndex += length;
    }

    private void parseRequestLine(ByteBuf buf, StringBuilder builder)
    {
        int idx = buf.readerIndex();
        int end = buf.writerIndex();
        for (; idx < end; ++idx)
        {
            byte next = buf.getByte(idx);
            
            // break on CR
            if (next == CARRAIGE_RETURN)
            {
                break;
            }
            
            // we need the method
            else if (myMethod == null)
            {
                if (next == SPACE)
                {
                    myMethod = Method.fromBuilder(builder);
                    builder.delete(0, builder.length());
                    builder.ensureCapacity(100);
                }
                else
                {
                    builder.append((char) next);
                }
            }
            
            // we need the path
            else if (myPath == null)
            {
                if (next == SPACE)
                {
                    myPath = builder.toString();
                    builder.delete(0, builder.length());
                }
                else
                {
                    builder.append((char) next);
                }
            }
            
            // don't need the version right now
        }
        idx += 2; // skip line endings
        buf.readerIndex(idx);
    }
    
    private boolean parseNextHeader(ByteBuf buf, StringBuilder builder)
    {
        Header header = null;
        int idx = buf.readerIndex();
        int end = buf.writerIndex();
        for (; idx < end; ++idx)
        {
            byte next = buf.getByte(idx);
            
            // break on CR
            if (next == CARRAIGE_RETURN)
            {
                if (header != Header.UNHANDLED)
                {
                    myHeaders.put(header,builder.toString());
                    builder.delete(0, builder.length());
                }
                break;
            }
            
            else if (header == null)
            {
                // we have the full header name
                if (next == COLON)
                {
                    header = Header.fromBuilder(builder);
                    builder.delete(0, builder.length());
                }

                // get header name as lower case for mapping purposes
                else
                {
                    builder.append(next > 64 && next < 91 ? 
                        (char) ( next | 32 ) : (char) next);
                }
            }
            
            // we don't care about some headers
            else if (header == Header.UNHANDLED)
            {
                continue;
            }
            
            // skip initial spaces
            else if (builder.length() == 0 && next == SPACE)
            {
                continue;
            }
            
            // get the header value
            else
            {
                builder.append((char) next);
            }
        }
        
        idx += 2; // skip line endings
        buf.readerIndex(idx);
        
        if (buf.getByte(idx) == CARRAIGE_RETURN)
        {
            idx += 2; // skip line endings
            buf.readerIndex(idx);
            return false;
        }
        else
        {
            return true;
        }
    }
    
    private void parseBody(ByteBuf buf)
    {
        int length = buf.readableBytes();
        if (length == 0)
        {
            myBody = new byte[0];
            myIndex = 1;
        }
        else
        {
            System.out.println("Content-Length: " + myHeaders.get(Header.CONTENT_LENGTH));
            if (myHeaders.get(Header.CONTENT_LENGTH) != null)
            {
                int totalLength = Integer.valueOf(myHeaders.get(Header.CONTENT_LENGTH));
                myBody = new byte[totalLength];
                buf.getBytes(buf.readerIndex(), myBody, myIndex, length);
                myIndex += length;
            }
            
            // TODO handle chunked
        }
    }
    
    
    
    
    public enum Method
    {
        GET(new char[]{71, 69, 84}), 
        POST(new char[]{80, 79, 83, 84}),
        UNHANDLED(new char[]{}); // could be expanded if needed
        
        private char[] chars;

        Method(char[] chars) 
        {
            this.chars = chars;
        }
        
        public static Method fromBuilder(StringBuilder builder) 
        {
            for (Method method : Method.values()) 
            {
                if (method.chars.length == builder.length()) 
                {
                    boolean match = true;
                    for (int i = 0; i < builder.length(); i++) 
                    {
                        if (method.chars[i] != builder.charAt(i)) 
                        {
                            match = false;
                            break;
                        }
                    }
                    
                    if (match)
                    {
                        return method;
                    }
                }
            }
            return null;
        }
    }
    
    public enum Header
    {
        HOST(new char[]{104, 111, 115, 116}), 
        CONNECTION(new char[]{99, 111, 110, 110, 101, 99, 116, 105, 111, 110}),
        IF_MODIFIED_SINCE(new char[]{
            105, 102, 45, 109, 111, 100, 105, 102, 105, 101, 100, 45, 115, 
            105, 110, 99, 101}),
        COOKIE(new char[]{99, 111, 111, 107, 105, 101}),
        CONTENT_LENGTH(new char[]{
            99, 111, 110, 116, 101, 110, 116, 45, 108, 101, 110, 103, 116, 104}),
        UNHANDLED(new char[]{}); // could be expanded if needed
        
        private char[] chars;

        Header(char[] chars) 
        {
            this.chars = chars;
        }
        
        public static Header fromBuilder(StringBuilder builder) 
        {
            for (Header header : Header.values()) 
            {
                if (header.chars.length == builder.length()) 
                {                    
                    boolean match = true;
                    for (int i = 0; i < builder.length(); i++) 
                    {
                        if (header.chars[i] != builder.charAt(i)) 
                        {
                            match = false;
                            break;
                        }
                    }
                    
                    if (match)
                    {
                        return header;
                    }
                }
            }
            return UNHANDLED;
        }
    }
}

用于测试的简单处理程序:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

@Sharable
public class SharableHttpHandler extends SimpleChannelInboundHandler<SharableHttpRequest>
{    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SharableHttpRequest msg) 
        throws Exception
    {
        String message = "HTTP/1.1 200 OK\r\n" +
                "Content-type: text/html\r\n" + 
                "Content-length: 42\r\n\r\n" + 
                "<html><body>Hello sharedworld</body><html>";
        
        ByteBuf buffer = ctx.alloc().buffer(message.length());
        buffer.writeCharSequence(message, CharsetUtil.UTF_8);
        ChannelFuture flushPromise = ctx.channel().writeAndFlush(buffer);
        flushPromise.addListener(ChannelFutureListener.CLOSE);
        if (!flushPromise.isSuccess()) 
        {
            flushPromise.cause().printStackTrace(System.err);
        }
    }    
}

使用这些可共享处理程序的完整管道:

import tests.SharableHttpDecoder;
import tests.SharableHttpHandler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class ServerPipeline extends ChannelInitializer<SocketChannel>
{
    private final SharableHttpDecoder decoder = new SharableHttpDecoder();
    private final SharableHttpHandler handler = new SharableHttpHandler();

    @Override
    public void initChannel(SocketChannel channel)
    {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(decoder);
        pipeline.addLast(handler);
        
    }
}

以上是针对这个(更常见的)非共享管道进行测试的:

import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.CharsetUtil;

public class ServerPipeline extends ChannelInitializer<SocketChannel>
{

    @Override
    public void initChannel(SocketChannel channel)
    {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));
        pipeline.addLast(new UnsharedHttpHandler());
        
    }
    
    class UnsharedHttpHandler extends SimpleChannelInboundHandler<FullHttpRequest>
    {

        @Override
        public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) 
            throws Exception
        {
            String message = "<html><body>Hello sharedworld</body><html>";
            ByteBuf buffer = ctx.alloc().buffer(message.length());
            buffer.writeCharSequence(message.toString(), CharsetUtil.UTF_8);

            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buffer);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
            HttpUtil.setContentLength(response, response.content().readableBytes());
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
            ChannelFuture flushPromise = ctx.writeAndFlush(response);
            flushPromise.addListener(ChannelFutureListener.CLOSE);
                        
        }
    }
}