Netty - 周期性 HTTP 请求的复用通道

Netty - Reuse Channel for periodic HTTP requests

我想与 HTTPS 服务器建立连接(例如 google.com)并定期获取最新内容。

我写了简单的 HTTP 客户端:

public class AsyncLoader {
    private static final String HOST = "google.com";
    private static final int PORT = 443;

    public static void main(String[] args) throws InterruptedException, IOException, URISyntaxException {
        final SslContext sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();

        EventLoopGroup elg = new NioEventLoopGroup();
        Bootstrap cb = new Bootstrap()
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.SO_REUSEADDR, false)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .group(elg)
                .channel(NioSocketChannel.class)
                .remoteAddress(HOST, PORT)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast("ssl", new SslHandler(sslCtx.newEngine(ch.alloc())))
                                .addLast("http", new HttpClientCodec(4096, 8192, 8192, true, true))
                                .addLast("simple", new SimpleChannelInboundHandler<HttpObject>() {
                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//                                        ctx.writeAndFlush(createReq());
                                    }

                                    @Override
                                    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
                                        System.out.println(msg);
//                                        ctx.writeAndFlush(createReq());
                                    }
                                })
                                .addLast(new LoggingHandler());
                    }
                });

        Channel channel = cb.connect().sync().channel();

        channel.write(createReq());
        Thread.sleep(1000L);
        channel.write(createReq());
    }

    private static DefaultFullHttpRequest createReq() throws URISyntaxException {
        DefaultFullHttpRequest requestCopy = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
                new URI("/").toASCIIString());
        HttpHeaders headersCopy = requestCopy.headers();
        headersCopy.set(HttpHeaderNames.HOST, HOST);
        headersCopy.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        return requestCopy;
    }
}

main 方法的末尾,我发送了两个延迟 1 秒的 HTTP 请求。服务器响应第一个请求,但不响应第二个...

我已启用详细日志记录:

FINE: -Dio.netty.recycler.maxCapacity.maxCapacity: 262144
Jan 28, 2016 4:32:30 PM io.netty.handler.logging.LoggingHandler write


FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] WRITE, DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET / HTTP/1.1
host: google.com
connection: keep-alive, 0B


Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.util.InsecureTrustManagerFactory checkServerTrusted
FINE: Accepting a server certificate: CN=*.google.com, O=Google Inc, L=Mountain View, ST=California, C=US
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.CipherSuiteConverter cacheFromOpenSsl
FINE: Cipher suite mapping: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 => ECDHE-RSA-AES128-GCM-SHA256
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.CipherSuiteConverter cacheFromOpenSsl
FINE: Cipher suite mapping: SSL_ECDHE_RSA_WITH_AES_128_GCM_SHA256 => ECDHE-RSA-AES128-GCM-SHA256
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.SslHandler setHandshakeSuccess
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
Jan 28, 2016 4:32:30 PM io.netty.handler.logging.LoggingHandler userEventTriggered
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] USER_EVENT: SslHandshakeCompletionEvent(SUCCESS)


DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 302 Found
Location: https://ipv4.google.com/sorry/IndexRedirect?continue=https://google.com/&q=CGMSBFuXuysY7rCotQUiGQDxp4NLP1T70JLoOfxaOtbIYimcgkkrqxE
Date: Thu, 28 Jan 2016 13:32:31 GMT
Pragma: no-cache
Expires: Fri, 01 Jan 1990 00:00:00 GMT
Cache-Control: no-store, no-cache, must-revalidate, post-check=0, pre-check=0
Content-Type: text/html; charset=UTF-8
Server: HTTP server (unknown)
Content-Length: 331
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
Alternate-Protocol: 443:quic,p=1
Alt-Svc: quic=":443"; ma=604800; v="30,29,28,27,26,25"
DefaultLastHttpContent(data: SlicedAbstractByteBuf(ridx: 0, widx: 331, cap: 331/331, unwrapped: PooledUnsafeDirectByteBuf(ridx: 913, widx: 913, cap: 942)), decoderResult: success)


Jan 28, 2016 4:32:32 PM io.netty.handler.logging.LoggingHandler write


FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] WRITE, DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET / HTTP/1.1
host: google.com
connection: keep-alive, 0B

Jan 28, 2016 4:36:31 PM io.netty.handler.logging.LoggingHandler channelInactive
FINE: [id: 0x2baa7120, /172.21.222.178:32972 :> google.com/173.194.220.138:443] INACTIVE
Jan 28, 2016 4:36:31 PM io.netty.handler.logging.LoggingHandler exceptionCaught
FINE: [id: 0x2baa7120, /172.21.222.178:32972 :> google.com/173.194.220.138:443] EXCEPTION: io.netty.handler.codec.PrematureChannelClosureException: channel gone inactive with 1 missing response(s)
io.netty.handler.codec.PrematureChannelClosureException: channel gone inactive with 1 missing response(s)
    at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:228)
    at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:213)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelInactiveNow(ChannelHandlerInvokerUtil.java:56)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelInactive(DefaultChannelHandlerInvoker.java:93)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:133)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:332)
    at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:724)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelInactiveNow(ChannelHandlerInvokerUtil.java:56)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelInactive(DefaultChannelHandlerInvoker.java:93)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:133)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:895)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:719)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:356)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:742)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)

如您所见,服务器响应了第一个请求并忽略了第二个请求。

但是如果我删除:

channel.write(createReq());
Thread.sleep(1000L);
channel.write(createReq());

并取消注释 ChannelInitializer:

中的代码
.addLast("simple", new SimpleChannelInboundHandler<HttpObject>() {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(createReq());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        System.out.println(msg);
        ctx.writeAndFlush(createReq());
    }
 })

一切都会顺利进行,我得到了预期的效果:

DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 302 Found
Location: https://ipv4.google.com/sorry/IndexRedirect?continue=https://google.com/&q=CGMSBFuXuysYprSotQUiGQDxp4NLFkyG4_X9zrF5oyzQ5olUQEgR-54
Date: Thu, 28 Jan 2016 13:39:50 GMT
Pragma: no-cache
Expires: Fri, 01 Jan 1990 00:00:00 GMT
Cache-Control: no-store, no-cache, must-revalidate, post-check=0, pre-check=0
Content-Type: text/html; charset=UTF-8
Server: HTTP server (unknown)
Content-Length: 331
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
Alternate-Protocol: 443:quic,p=1
Alt-Svc: quic=":443"; ma=604800; v="30,29,28,27,26,25"
DefaultLastHttpContent(data: SlicedAbstractByteBuf(ridx: 0, widx: 331, cap: 331/331, unwrapped: PooledUnsafeDirectByteBuf(ridx: 913, widx: 913, cap: 942)), decoderResult: success)
DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 302 Found
Location: https://ipv4.google.com/sorry/IndexRedirect?continue=https://google.com/&q=CGMSBFuXuysYprSotQUiGQDxp4NLFkyG4_X9zrF5oyzQ5olUQEgR-54
Date: Thu, 28 Jan 2016 13:39:51 GMT
Pragma: no-cache
Expires: Fri, 01 Jan 1990 00:00:00 GMT
Cache-Control: no-store, no-cache, must-revalidate, post-check=0, pre-check=0
Content-Type: text/html; charset=UTF-8
Server: HTTP server (unknown)
Content-Length: 331
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
Alternate-Protocol: 443:quic,p=1
Alt-Svc: quic=":443"; ma=604800; v="30,29,28,27,26,25"
DefaultLastHttpContent(data: SlicedAbstractByteBuf(ridx: 0, widx: 331, cap: 331/331, unwrapped: PooledUnsafeDirectByteBuf(ridx: 913, widx: 913, cap: 942)), decoderResult: success)

我的问题是什么?我不能在 Netty 的回调方法之外写入通道?

当使用 netty 时,重要的是要确保每次调用 write() 之后都会调用 flush(),当你写入你的频道时,你写入时没有刷新,导致数据留在程序的内存中,不被发送出去

将您的呼叫从 write() 更改为 writeAndFlush()