Netty - 周期性 HTTP 请求的复用通道
Netty - Reuse Channel for periodic HTTP requests
我想与 HTTPS 服务器建立连接(例如 google.com)并定期获取最新内容。
我写了简单的 HTTP 客户端:
public class AsyncLoader {
private static final String HOST = "google.com";
private static final int PORT = 443;
public static void main(String[] args) throws InterruptedException, IOException, URISyntaxException {
final SslContext sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
EventLoopGroup elg = new NioEventLoopGroup();
Bootstrap cb = new Bootstrap()
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, false)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.group(elg)
.channel(NioSocketChannel.class)
.remoteAddress(HOST, PORT)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("ssl", new SslHandler(sslCtx.newEngine(ch.alloc())))
.addLast("http", new HttpClientCodec(4096, 8192, 8192, true, true))
.addLast("simple", new SimpleChannelInboundHandler<HttpObject>() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(createReq());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println(msg);
// ctx.writeAndFlush(createReq());
}
})
.addLast(new LoggingHandler());
}
});
Channel channel = cb.connect().sync().channel();
channel.write(createReq());
Thread.sleep(1000L);
channel.write(createReq());
}
private static DefaultFullHttpRequest createReq() throws URISyntaxException {
DefaultFullHttpRequest requestCopy = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
new URI("/").toASCIIString());
HttpHeaders headersCopy = requestCopy.headers();
headersCopy.set(HttpHeaderNames.HOST, HOST);
headersCopy.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
return requestCopy;
}
}
在 main 方法的末尾,我发送了两个延迟 1 秒的 HTTP 请求。服务器响应第一个请求,但不响应第二个...
我已启用详细日志记录:
FINE: -Dio.netty.recycler.maxCapacity.maxCapacity: 262144
Jan 28, 2016 4:32:30 PM io.netty.handler.logging.LoggingHandler write
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] WRITE, DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET / HTTP/1.1
host: google.com
connection: keep-alive, 0B
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.util.InsecureTrustManagerFactory checkServerTrusted
FINE: Accepting a server certificate: CN=*.google.com, O=Google Inc, L=Mountain View, ST=California, C=US
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.CipherSuiteConverter cacheFromOpenSsl
FINE: Cipher suite mapping: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 => ECDHE-RSA-AES128-GCM-SHA256
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.CipherSuiteConverter cacheFromOpenSsl
FINE: Cipher suite mapping: SSL_ECDHE_RSA_WITH_AES_128_GCM_SHA256 => ECDHE-RSA-AES128-GCM-SHA256
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.SslHandler setHandshakeSuccess
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
Jan 28, 2016 4:32:30 PM io.netty.handler.logging.LoggingHandler userEventTriggered
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] USER_EVENT: SslHandshakeCompletionEvent(SUCCESS)
DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 302 Found
Location: https://ipv4.google.com/sorry/IndexRedirect?continue=https://google.com/&q=CGMSBFuXuysY7rCotQUiGQDxp4NLP1T70JLoOfxaOtbIYimcgkkrqxE
Date: Thu, 28 Jan 2016 13:32:31 GMT
Pragma: no-cache
Expires: Fri, 01 Jan 1990 00:00:00 GMT
Cache-Control: no-store, no-cache, must-revalidate, post-check=0, pre-check=0
Content-Type: text/html; charset=UTF-8
Server: HTTP server (unknown)
Content-Length: 331
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
Alternate-Protocol: 443:quic,p=1
Alt-Svc: quic=":443"; ma=604800; v="30,29,28,27,26,25"
DefaultLastHttpContent(data: SlicedAbstractByteBuf(ridx: 0, widx: 331, cap: 331/331, unwrapped: PooledUnsafeDirectByteBuf(ridx: 913, widx: 913, cap: 942)), decoderResult: success)
Jan 28, 2016 4:32:32 PM io.netty.handler.logging.LoggingHandler write
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] WRITE, DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET / HTTP/1.1
host: google.com
connection: keep-alive, 0B
Jan 28, 2016 4:36:31 PM io.netty.handler.logging.LoggingHandler channelInactive
FINE: [id: 0x2baa7120, /172.21.222.178:32972 :> google.com/173.194.220.138:443] INACTIVE
Jan 28, 2016 4:36:31 PM io.netty.handler.logging.LoggingHandler exceptionCaught
FINE: [id: 0x2baa7120, /172.21.222.178:32972 :> google.com/173.194.220.138:443] EXCEPTION: io.netty.handler.codec.PrematureChannelClosureException: channel gone inactive with 1 missing response(s)
io.netty.handler.codec.PrematureChannelClosureException: channel gone inactive with 1 missing response(s)
at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:228)
at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:213)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelInactiveNow(ChannelHandlerInvokerUtil.java:56)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelInactive(DefaultChannelHandlerInvoker.java:93)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:133)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:332)
at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:724)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelInactiveNow(ChannelHandlerInvokerUtil.java:56)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelInactive(DefaultChannelHandlerInvoker.java:93)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:133)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:895)
at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:719)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:356)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:742)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
如您所见,服务器响应了第一个请求并忽略了第二个请求。
但是如果我删除:
channel.write(createReq());
Thread.sleep(1000L);
channel.write(createReq());
并取消注释 ChannelInitializer:
中的代码
.addLast("simple", new SimpleChannelInboundHandler<HttpObject>() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(createReq());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println(msg);
ctx.writeAndFlush(createReq());
}
})
一切都会顺利进行,我得到了预期的效果:
DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 302 Found
Location: https://ipv4.google.com/sorry/IndexRedirect?continue=https://google.com/&q=CGMSBFuXuysYprSotQUiGQDxp4NLFkyG4_X9zrF5oyzQ5olUQEgR-54
Date: Thu, 28 Jan 2016 13:39:50 GMT
Pragma: no-cache
Expires: Fri, 01 Jan 1990 00:00:00 GMT
Cache-Control: no-store, no-cache, must-revalidate, post-check=0, pre-check=0
Content-Type: text/html; charset=UTF-8
Server: HTTP server (unknown)
Content-Length: 331
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
Alternate-Protocol: 443:quic,p=1
Alt-Svc: quic=":443"; ma=604800; v="30,29,28,27,26,25"
DefaultLastHttpContent(data: SlicedAbstractByteBuf(ridx: 0, widx: 331, cap: 331/331, unwrapped: PooledUnsafeDirectByteBuf(ridx: 913, widx: 913, cap: 942)), decoderResult: success)
DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 302 Found
Location: https://ipv4.google.com/sorry/IndexRedirect?continue=https://google.com/&q=CGMSBFuXuysYprSotQUiGQDxp4NLFkyG4_X9zrF5oyzQ5olUQEgR-54
Date: Thu, 28 Jan 2016 13:39:51 GMT
Pragma: no-cache
Expires: Fri, 01 Jan 1990 00:00:00 GMT
Cache-Control: no-store, no-cache, must-revalidate, post-check=0, pre-check=0
Content-Type: text/html; charset=UTF-8
Server: HTTP server (unknown)
Content-Length: 331
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
Alternate-Protocol: 443:quic,p=1
Alt-Svc: quic=":443"; ma=604800; v="30,29,28,27,26,25"
DefaultLastHttpContent(data: SlicedAbstractByteBuf(ridx: 0, widx: 331, cap: 331/331, unwrapped: PooledUnsafeDirectByteBuf(ridx: 913, widx: 913, cap: 942)), decoderResult: success)
我的问题是什么?我不能在 Netty 的回调方法之外写入通道?
当使用 netty 时,重要的是要确保每次调用 write()
之后都会调用 flush()
,当你写入你的频道时,你写入时没有刷新,导致数据留在程序的内存中,不被发送出去
将您的呼叫从 write()
更改为 writeAndFlush()
。
我想与 HTTPS 服务器建立连接(例如 google.com)并定期获取最新内容。
我写了简单的 HTTP 客户端:
public class AsyncLoader {
private static final String HOST = "google.com";
private static final int PORT = 443;
public static void main(String[] args) throws InterruptedException, IOException, URISyntaxException {
final SslContext sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
EventLoopGroup elg = new NioEventLoopGroup();
Bootstrap cb = new Bootstrap()
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, false)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.group(elg)
.channel(NioSocketChannel.class)
.remoteAddress(HOST, PORT)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("ssl", new SslHandler(sslCtx.newEngine(ch.alloc())))
.addLast("http", new HttpClientCodec(4096, 8192, 8192, true, true))
.addLast("simple", new SimpleChannelInboundHandler<HttpObject>() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(createReq());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println(msg);
// ctx.writeAndFlush(createReq());
}
})
.addLast(new LoggingHandler());
}
});
Channel channel = cb.connect().sync().channel();
channel.write(createReq());
Thread.sleep(1000L);
channel.write(createReq());
}
private static DefaultFullHttpRequest createReq() throws URISyntaxException {
DefaultFullHttpRequest requestCopy = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
new URI("/").toASCIIString());
HttpHeaders headersCopy = requestCopy.headers();
headersCopy.set(HttpHeaderNames.HOST, HOST);
headersCopy.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
return requestCopy;
}
}
在 main 方法的末尾,我发送了两个延迟 1 秒的 HTTP 请求。服务器响应第一个请求,但不响应第二个...
我已启用详细日志记录:
FINE: -Dio.netty.recycler.maxCapacity.maxCapacity: 262144
Jan 28, 2016 4:32:30 PM io.netty.handler.logging.LoggingHandler write
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] WRITE, DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET / HTTP/1.1
host: google.com
connection: keep-alive, 0B
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.util.InsecureTrustManagerFactory checkServerTrusted
FINE: Accepting a server certificate: CN=*.google.com, O=Google Inc, L=Mountain View, ST=California, C=US
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.CipherSuiteConverter cacheFromOpenSsl
FINE: Cipher suite mapping: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 => ECDHE-RSA-AES128-GCM-SHA256
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.CipherSuiteConverter cacheFromOpenSsl
FINE: Cipher suite mapping: SSL_ECDHE_RSA_WITH_AES_128_GCM_SHA256 => ECDHE-RSA-AES128-GCM-SHA256
Jan 28, 2016 4:32:30 PM io.netty.handler.ssl.SslHandler setHandshakeSuccess
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
Jan 28, 2016 4:32:30 PM io.netty.handler.logging.LoggingHandler userEventTriggered
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] USER_EVENT: SslHandshakeCompletionEvent(SUCCESS)
DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 302 Found
Location: https://ipv4.google.com/sorry/IndexRedirect?continue=https://google.com/&q=CGMSBFuXuysY7rCotQUiGQDxp4NLP1T70JLoOfxaOtbIYimcgkkrqxE
Date: Thu, 28 Jan 2016 13:32:31 GMT
Pragma: no-cache
Expires: Fri, 01 Jan 1990 00:00:00 GMT
Cache-Control: no-store, no-cache, must-revalidate, post-check=0, pre-check=0
Content-Type: text/html; charset=UTF-8
Server: HTTP server (unknown)
Content-Length: 331
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
Alternate-Protocol: 443:quic,p=1
Alt-Svc: quic=":443"; ma=604800; v="30,29,28,27,26,25"
DefaultLastHttpContent(data: SlicedAbstractByteBuf(ridx: 0, widx: 331, cap: 331/331, unwrapped: PooledUnsafeDirectByteBuf(ridx: 913, widx: 913, cap: 942)), decoderResult: success)
Jan 28, 2016 4:32:32 PM io.netty.handler.logging.LoggingHandler write
FINE: [id: 0x2baa7120, /172.21.222.178:32972 => google.com/173.194.220.138:443] WRITE, DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET / HTTP/1.1
host: google.com
connection: keep-alive, 0B
Jan 28, 2016 4:36:31 PM io.netty.handler.logging.LoggingHandler channelInactive
FINE: [id: 0x2baa7120, /172.21.222.178:32972 :> google.com/173.194.220.138:443] INACTIVE
Jan 28, 2016 4:36:31 PM io.netty.handler.logging.LoggingHandler exceptionCaught
FINE: [id: 0x2baa7120, /172.21.222.178:32972 :> google.com/173.194.220.138:443] EXCEPTION: io.netty.handler.codec.PrematureChannelClosureException: channel gone inactive with 1 missing response(s)
io.netty.handler.codec.PrematureChannelClosureException: channel gone inactive with 1 missing response(s)
at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:228)
at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:213)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelInactiveNow(ChannelHandlerInvokerUtil.java:56)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelInactive(DefaultChannelHandlerInvoker.java:93)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:133)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:332)
at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:724)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelInactiveNow(ChannelHandlerInvokerUtil.java:56)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelInactive(DefaultChannelHandlerInvoker.java:93)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:133)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:895)
at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:719)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:356)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:742)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
如您所见,服务器响应了第一个请求并忽略了第二个请求。
但是如果我删除:
channel.write(createReq());
Thread.sleep(1000L);
channel.write(createReq());
并取消注释 ChannelInitializer:
中的代码.addLast("simple", new SimpleChannelInboundHandler<HttpObject>() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(createReq());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println(msg);
ctx.writeAndFlush(createReq());
}
})
一切都会顺利进行,我得到了预期的效果:
DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 302 Found
Location: https://ipv4.google.com/sorry/IndexRedirect?continue=https://google.com/&q=CGMSBFuXuysYprSotQUiGQDxp4NLFkyG4_X9zrF5oyzQ5olUQEgR-54
Date: Thu, 28 Jan 2016 13:39:50 GMT
Pragma: no-cache
Expires: Fri, 01 Jan 1990 00:00:00 GMT
Cache-Control: no-store, no-cache, must-revalidate, post-check=0, pre-check=0
Content-Type: text/html; charset=UTF-8
Server: HTTP server (unknown)
Content-Length: 331
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
Alternate-Protocol: 443:quic,p=1
Alt-Svc: quic=":443"; ma=604800; v="30,29,28,27,26,25"
DefaultLastHttpContent(data: SlicedAbstractByteBuf(ridx: 0, widx: 331, cap: 331/331, unwrapped: PooledUnsafeDirectByteBuf(ridx: 913, widx: 913, cap: 942)), decoderResult: success)
DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 302 Found
Location: https://ipv4.google.com/sorry/IndexRedirect?continue=https://google.com/&q=CGMSBFuXuysYprSotQUiGQDxp4NLFkyG4_X9zrF5oyzQ5olUQEgR-54
Date: Thu, 28 Jan 2016 13:39:51 GMT
Pragma: no-cache
Expires: Fri, 01 Jan 1990 00:00:00 GMT
Cache-Control: no-store, no-cache, must-revalidate, post-check=0, pre-check=0
Content-Type: text/html; charset=UTF-8
Server: HTTP server (unknown)
Content-Length: 331
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
Alternate-Protocol: 443:quic,p=1
Alt-Svc: quic=":443"; ma=604800; v="30,29,28,27,26,25"
DefaultLastHttpContent(data: SlicedAbstractByteBuf(ridx: 0, widx: 331, cap: 331/331, unwrapped: PooledUnsafeDirectByteBuf(ridx: 913, widx: 913, cap: 942)), decoderResult: success)
我的问题是什么?我不能在 Netty 的回调方法之外写入通道?
当使用 netty 时,重要的是要确保每次调用 write()
之后都会调用 flush()
,当你写入你的频道时,你写入时没有刷新,导致数据留在程序的内存中,不被发送出去
将您的呼叫从 write()
更改为 writeAndFlush()
。