当请求写入代理 netty 服务器中的 outboundChannel 时,如何在 samehandler 中获取响应 byteBuf
how to get response byteBuf in samehandler as request is written to outboundChannel in a proxy netty server
我正在实现 netty 代理服务器,如下所示:
一个http请求进来,
- 如果本地缓存有数据,写入通道并刷新
- 如果没有,从远程服务器获取数据,将其添加到缓存并刷新
我在写给客户端的同一处理程序中难以从响应中提取 byteBuf。
在下面的示例中,如果您看到 HexDumpProxyFrontendHandler
的 channelRead
方法,您将看到我如何从缓存中获取并写入。我在下面遇到困难的方法中添加了评论
此代码端到端运行。所以可以在本地复制和测试。
我可以在 HexDumpProxyBackendhandler#channelRead
中看到 FullHttpResponse
个对象。但是在这个方法中,我没有引用缓存,也没有我想在缓存中添加的 id。
我认为有两种方法可以解决这个问题,但我不清楚如何解决。
1)要么在HexdumpProxyBackendHandler中获取缓存引用和id,然后就变得容易了。但是 hexDumpBackendhander
在 HexDumpFrontendHandler
的 channelActive
中实例化,此时我还没有解析我的传入请求
2) 获取在HexdumpFrontendHandler#dchannelRead
中提取的响应bytebuf,在这种情况下它只是缓存插入。
HexDumpProxy.java
public final class HexDumpProxy {
static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
static Map<Long,String> localCache = new HashMap<>();
public static void main(String[] args) throws Exception {
System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
localCache.put(123L, "profile1");
localCache.put(234L, "profile2");
// Configure the bootstrap.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
.childOption(ChannelOption.AUTO_READ, false)
.bind(LOCAL_PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
HexDumpProxyInitializer.java
public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {
private final String remoteHost;
private final int remotePort;
private Map<Long, String> cache;
public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.cache=cache;
}
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new HttpServerCodec(),
new HttpObjectAggregator(8*1024, true),
new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));
}
}
HexDumpProxyFrontendHandler.java
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
private Map<Long, String> cache;
public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.cache = cache;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler((new ChannelInitializer() {
protected void initChannel(Channel ch) {
ChannelPipeline var2 = ch.pipeline();
var2.addLast((new HttpClientCodec()));
var2.addLast(new HttpObjectAggregator(8192, true));
var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
}
}))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
System.out.println("msg is instanceof httpRequest");
HttpRequest req = (HttpRequest)msg;
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
String userId = queryStringDecoder.parameters().get("id").get(0);
Long id = Long.valueOf(userId);
if (cache.containsKey(id)){
StringBuilder buf = new StringBuilder();
buf.append(cache.get(id));
writeResponse(req, ctx, buf);
closeOnFlush(ctx.channel());
return;
}
}
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
//get response back from HexDumpProxyBackendHander and write to cache
//basically I need to do cache.put(id, parse(response));
//how to get response buf from inboundChannel here is the question I am trying to solve
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
/**
* Closes the specified channel after all queued write requests are flushed.
*/
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
//borrowed from HttpSnoopServerHandler.java in snoop example
private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Encode the cookie.
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
if (cookieString != null) {
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
}
}
} else {
// Browser sent no cookie. Add some.
response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
}
// Write the response.
ctx.write(response);
return keepAlive;
}
}
HexDumpProxyBackendHandler.java
public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {
private final Channel inboundChannel;
public HexDumpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse) {
System.out.println("this is fullHttpResponse");
}
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
}
}
P.S:我从netty-example项目中提取了大部分代码并对其进行了定制
编辑
根据 Ferrygig 的建议,我更改了 FrontEndChannelHander#channelRead 如下。我已经删除了 channelActive 并实现了 write 方法
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
System.out.println("msg is instanceof httpRequest");
HttpRequest req = (HttpRequest)msg;
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
String userId = queryStringDecoder.parameters().get("id").get(0);
id = Long.valueOf(userId);
if (cache.containsKey(id)){
StringBuilder buf = new StringBuilder();
buf.append(cache.get(id));
writeResponse(req, ctx, buf);
closeOnFlush(ctx.channel());
return;
}
final Channel inboundChannel = ctx.channel();
//copied from channelActive method
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler((new ChannelInitializer() {
protected void initChannel(Channel ch) {
ChannelPipeline var2 = ch.pipeline();
var2.addLast((new HttpClientCodec()));
var2.addLast(new HttpObjectAggregator(8192, true));
var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
}
}));
//.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
风暴
我可能是错的,当我读到你的这一部分时HexDumpProxyFrontendHandler
,我觉得可能有些地方不正确(我根据正确的风格将我的评论放在前面一点以使它们可见):
// Not incorrect but better to have only one bootstrap and reusing it
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
// I know what AUTO_READ false is, but my question is why you need it?
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
// Strange to me to try to get the channel while you did not test yet it is linked
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// Maybe you should start to send there, therefore getting the outboundChannel right there?
// add a log in order to see if you come there
// probably you have to send first, before asking to read anything?
// position (1)
inboundChannel.read();
} else {
inboundChannel.close();
}
}
});
// I suggest to move this in position named (1)
if (outboundChannel.isActive()) {
// maybe a log to see if anything will be written?
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
System.out.println("success!! - FrontEndHandler");
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
对我来说,你好像没有等到频道打开。你发送wire的时候少了一些log,为了保证你真的发送了什么(在log中,我们只能看到连接打开,然后关闭为主,中间什么都没有)。
也许更多日志可以帮助我们和您?
有多种方法可以解决这个问题,而且对于您的最终目标,方法也有所不同。
目前,您使用的是 1 个入站连接是 1 个出站连接的拓扑结构,这使得系统设计稍微容易一些,因为您不必担心将多个请求同步到同一个出站流。
目前,您的前端处理程序扩展 ChannelInboundHandlerAdapter
,这只会拦截进入您的应用程序的 "packets",如果我们让它扩展 ChannelDuplexHandler
,我们还可以处理 [=69] =] 退出应用程序。
为了接近这条路径,我们需要更新 HexDumpProxyFrontendHandler
class 以扩展 ChannelDuplexHandler
(我们暂时称它为 CDH)。
流程的下一步是覆盖 write
method coming from the CDH,这样我们就可以在后端向我们发回响应时进行拦截。
创建写入方法后,我们需要通过调用 put
方法更新我们的(非线程安全)映射。
public class HexDumpProxyFrontendHandler extends ChannelDuplexHandler {
Long lastId;
// ...
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
System.out.println("msg is instanceof httpRequest");
HttpRequest req = (HttpRequest)msg;
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
String userId = queryStringDecoder.parameters().get("id").get(0);
Long id = Long.valueOf(userId);
lastId = id; // Store ID of last request
// ...
}
// ...
}
// ...
public void write(
ChannelHandlerContext ctx,
java.lang.Object msg,
ChannelPromise promise
) throws java.lang.Exception {
if (msg instanceof FullHttpResponse) {
System.out.println("this is fullHttpResponse");
FullHttpResponse full = (FullHttpResponse)msg;
cache.put(lastId, parse(full)); // TODO: Include a system here to convert the request to a string
}
super.write(ctx, msg, promise);
}
// ...
}
我们还没有完成这里,虽然我们有代码,但我们仍然需要修复代码中其他地方的一些错误。
非线程安全映射(严重错误)
其中一个错误是您使用普通哈希映射来处理缓存。这样做的问题是这不是线程安全的,如果多人同时连接到您的应用程序,可能会发生奇怪的事情,包括随着地图内部结构更新而导致完整地图损坏。
为了解决这个问题,我们将"upgrade"映射到ConcurrentHashMap
,这个映射有特殊的结构来处理同时请求和存储数据的多个线程,没有巨大的性能损失。 (如果性能是主要关注点,您 可能 通过使用每线程哈希映射而不是全局缓存来获得更高的性能,但这意味着每个资源都可以缓存到数量线程数。
没有缓存删除规则(主要错误)
目前还没有删除过时资源的代码,这意味着缓存将被填满,直到程序没有剩余内存,然后它会崩溃。
这可以通过使用提供线程安全访问和所谓的删除规则的映射实现,或使用像 Gnuava caches.
这样的预制缓存解决方案来解决。
未能正确处理 HTTP 管道(小错误)
HTTP 的一个鲜为人知的特性是 pipelining,这基本上意味着客户端可以向服务器发送另一个请求,而无需 等待响应之前的请求。这种类型的错误包括服务器交换两个请求的内容,甚至完全破坏它们。
虽然现在随着越来越多的 HTTP2 支持并且知道那里有损坏的服务器,流水线请求很少见,但使用它的某些 CLI 工具仍然会发生这种情况。
要解决此问题,请在发送上一个响应后才读取请求,其中一种方法是保留请求列表,或者使用更高级的 pre-make solutions
我正在实现 netty 代理服务器,如下所示: 一个http请求进来,
- 如果本地缓存有数据,写入通道并刷新
- 如果没有,从远程服务器获取数据,将其添加到缓存并刷新
我在写给客户端的同一处理程序中难以从响应中提取 byteBuf。
在下面的示例中,如果您看到 HexDumpProxyFrontendHandler
的 channelRead
方法,您将看到我如何从缓存中获取并写入。我在下面遇到困难的方法中添加了评论
此代码端到端运行。所以可以在本地复制和测试。
我可以在 HexDumpProxyBackendhandler#channelRead
中看到 FullHttpResponse
个对象。但是在这个方法中,我没有引用缓存,也没有我想在缓存中添加的 id。
我认为有两种方法可以解决这个问题,但我不清楚如何解决。
1)要么在HexdumpProxyBackendHandler中获取缓存引用和id,然后就变得容易了。但是 hexDumpBackendhander
在 HexDumpFrontendHandler
的 channelActive
中实例化,此时我还没有解析我的传入请求
2) 获取在HexdumpFrontendHandler#dchannelRead
中提取的响应bytebuf,在这种情况下它只是缓存插入。
HexDumpProxy.java
public final class HexDumpProxy {
static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
static Map<Long,String> localCache = new HashMap<>();
public static void main(String[] args) throws Exception {
System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
localCache.put(123L, "profile1");
localCache.put(234L, "profile2");
// Configure the bootstrap.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
.childOption(ChannelOption.AUTO_READ, false)
.bind(LOCAL_PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
HexDumpProxyInitializer.java
public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {
private final String remoteHost;
private final int remotePort;
private Map<Long, String> cache;
public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.cache=cache;
}
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new HttpServerCodec(),
new HttpObjectAggregator(8*1024, true),
new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));
}
}
HexDumpProxyFrontendHandler.java
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
private Map<Long, String> cache;
public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.cache = cache;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler((new ChannelInitializer() {
protected void initChannel(Channel ch) {
ChannelPipeline var2 = ch.pipeline();
var2.addLast((new HttpClientCodec()));
var2.addLast(new HttpObjectAggregator(8192, true));
var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
}
}))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
System.out.println("msg is instanceof httpRequest");
HttpRequest req = (HttpRequest)msg;
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
String userId = queryStringDecoder.parameters().get("id").get(0);
Long id = Long.valueOf(userId);
if (cache.containsKey(id)){
StringBuilder buf = new StringBuilder();
buf.append(cache.get(id));
writeResponse(req, ctx, buf);
closeOnFlush(ctx.channel());
return;
}
}
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
//get response back from HexDumpProxyBackendHander and write to cache
//basically I need to do cache.put(id, parse(response));
//how to get response buf from inboundChannel here is the question I am trying to solve
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
/**
* Closes the specified channel after all queued write requests are flushed.
*/
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
//borrowed from HttpSnoopServerHandler.java in snoop example
private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Encode the cookie.
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
if (cookieString != null) {
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
}
}
} else {
// Browser sent no cookie. Add some.
response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
}
// Write the response.
ctx.write(response);
return keepAlive;
}
}
HexDumpProxyBackendHandler.java
public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {
private final Channel inboundChannel;
public HexDumpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse) {
System.out.println("this is fullHttpResponse");
}
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
}
}
P.S:我从netty-example项目中提取了大部分代码并对其进行了定制
编辑
根据 Ferrygig 的建议,我更改了 FrontEndChannelHander#channelRead 如下。我已经删除了 channelActive 并实现了 write 方法
@Override public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
System.out.println("msg is instanceof httpRequest");
HttpRequest req = (HttpRequest)msg;
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
String userId = queryStringDecoder.parameters().get("id").get(0);
id = Long.valueOf(userId);
if (cache.containsKey(id)){
StringBuilder buf = new StringBuilder();
buf.append(cache.get(id));
writeResponse(req, ctx, buf);
closeOnFlush(ctx.channel());
return;
}
final Channel inboundChannel = ctx.channel();
//copied from channelActive method
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler((new ChannelInitializer() {
protected void initChannel(Channel ch) {
ChannelPipeline var2 = ch.pipeline();
var2.addLast((new HttpClientCodec()));
var2.addLast(new HttpObjectAggregator(8192, true));
var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
}
}));
//.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
风暴
我可能是错的,当我读到你的这一部分时HexDumpProxyFrontendHandler
,我觉得可能有些地方不正确(我根据正确的风格将我的评论放在前面一点以使它们可见):
// Not incorrect but better to have only one bootstrap and reusing it
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
// I know what AUTO_READ false is, but my question is why you need it?
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
// Strange to me to try to get the channel while you did not test yet it is linked
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// Maybe you should start to send there, therefore getting the outboundChannel right there?
// add a log in order to see if you come there
// probably you have to send first, before asking to read anything?
// position (1)
inboundChannel.read();
} else {
inboundChannel.close();
}
}
});
// I suggest to move this in position named (1)
if (outboundChannel.isActive()) {
// maybe a log to see if anything will be written?
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
System.out.println("success!! - FrontEndHandler");
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
对我来说,你好像没有等到频道打开。你发送wire的时候少了一些log,为了保证你真的发送了什么(在log中,我们只能看到连接打开,然后关闭为主,中间什么都没有)。
也许更多日志可以帮助我们和您?
有多种方法可以解决这个问题,而且对于您的最终目标,方法也有所不同。
目前,您使用的是 1 个入站连接是 1 个出站连接的拓扑结构,这使得系统设计稍微容易一些,因为您不必担心将多个请求同步到同一个出站流。
目前,您的前端处理程序扩展 ChannelInboundHandlerAdapter
,这只会拦截进入您的应用程序的 "packets",如果我们让它扩展 ChannelDuplexHandler
,我们还可以处理 [=69] =] 退出应用程序。
为了接近这条路径,我们需要更新 HexDumpProxyFrontendHandler
class 以扩展 ChannelDuplexHandler
(我们暂时称它为 CDH)。
流程的下一步是覆盖 write
method coming from the CDH,这样我们就可以在后端向我们发回响应时进行拦截。
创建写入方法后,我们需要通过调用 put
方法更新我们的(非线程安全)映射。
public class HexDumpProxyFrontendHandler extends ChannelDuplexHandler {
Long lastId;
// ...
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
System.out.println("msg is instanceof httpRequest");
HttpRequest req = (HttpRequest)msg;
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
String userId = queryStringDecoder.parameters().get("id").get(0);
Long id = Long.valueOf(userId);
lastId = id; // Store ID of last request
// ...
}
// ...
}
// ...
public void write(
ChannelHandlerContext ctx,
java.lang.Object msg,
ChannelPromise promise
) throws java.lang.Exception {
if (msg instanceof FullHttpResponse) {
System.out.println("this is fullHttpResponse");
FullHttpResponse full = (FullHttpResponse)msg;
cache.put(lastId, parse(full)); // TODO: Include a system here to convert the request to a string
}
super.write(ctx, msg, promise);
}
// ...
}
我们还没有完成这里,虽然我们有代码,但我们仍然需要修复代码中其他地方的一些错误。
非线程安全映射(严重错误)
其中一个错误是您使用普通哈希映射来处理缓存。这样做的问题是这不是线程安全的,如果多人同时连接到您的应用程序,可能会发生奇怪的事情,包括随着地图内部结构更新而导致完整地图损坏。
为了解决这个问题,我们将"upgrade"映射到ConcurrentHashMap
,这个映射有特殊的结构来处理同时请求和存储数据的多个线程,没有巨大的性能损失。 (如果性能是主要关注点,您 可能 通过使用每线程哈希映射而不是全局缓存来获得更高的性能,但这意味着每个资源都可以缓存到数量线程数。
没有缓存删除规则(主要错误)
目前还没有删除过时资源的代码,这意味着缓存将被填满,直到程序没有剩余内存,然后它会崩溃。
这可以通过使用提供线程安全访问和所谓的删除规则的映射实现,或使用像 Gnuava caches.
这样的预制缓存解决方案来解决。未能正确处理 HTTP 管道(小错误)
HTTP 的一个鲜为人知的特性是 pipelining,这基本上意味着客户端可以向服务器发送另一个请求,而无需 等待响应之前的请求。这种类型的错误包括服务器交换两个请求的内容,甚至完全破坏它们。
虽然现在随着越来越多的 HTTP2 支持并且知道那里有损坏的服务器,流水线请求很少见,但使用它的某些 CLI 工具仍然会发生这种情况。
要解决此问题,请在发送上一个响应后才读取请求,其中一种方法是保留请求列表,或者使用更高级的 pre-make solutions