支持流式传输和 http 缓存的 Netty 代理管道
Netty Proxy pipeline to support streaming and http caching
我正在尝试使用 Netty 构建代理服务,以支持流式调用和 HTTP/1 流量。
我已经成功地做到了这一点,但是当我尝试向管道中添加 HttpObjectAggregator 时 运行 遇到了问题。我需要聚合器来构建用于报告的 FullHttpResponse。
我当前的设置使用 2 个 ChannelInitializers 和 2 个业务逻辑处理程序
启动代理服务:
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SourceTransportHandlerInitializer())
.childOption(ChannelOption.AUTO_READ, false);
Channel channel;
try {
channel = serverBootstrap.bind(localPort).sync().channel();
channel.closeFuture().sync();
}catch (InterruptedException e){
// oh no
}
SourceTransportHandlerInitializer.java
public class SourceTransportHandlerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeLine = socketChannel.pipeline();
pipeLine.addLast(new HttpServerCodec(102400,102400,102400));
pipeLine.addLast(new HttpObjectAggregator(1048576));
pipeLine.addLast(new SourceHandler());
pipeLine.addLast(new LoggingHandler(LogLevel.INFO));
}
}
SourceHandler.java
public class SourceHandler extends ChannelInboundHandlerAdapter {
private volatile Channel outboundChannel;
@Override
public void channelActive(ChannelHandlerContext context) {
final Channel inChannel = context.channel();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(inChannel.eventLoop())
.channel(context.channel().getClass())
.handler(new TargetTransportHandlerInitializer(inChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture channelFuture = bootstrap.connect(Constants.host, Constants.hostPort);
outboundChannel = channelFuture.channel();
channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
if (channelFuture1.isSuccess()) {
inChannel.read();
} else {
inChannel.close();
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpRequest) {
//record request
}
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
// flush and read data
ctx.channel().read();
} else {
channelFuture.channel().close();
}
});
} else {
LOG.debug("SourceHandler did not read. Outbound Channel not active");
}
}
...
}
TargetTransportHandlerInitializer.java
public class TargetTransportHandlerInitializer extends ChannelInitializer<SocketChannel>{
protected final Channel inChannel;
public TargetTransportHandlerInitializer (final Channel inChannel){
this.inChannel = inChannel;
}
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeLine = socketChannel.pipeline();
pipeLine.addLast("codec", new HttpClientCodec(102400, 102400, 102400));
//pipeLine.addLast(new HttpObjectAggregator(1048576));
pipeLine.addLast(new TargetHandler(inChannel));
}
}
TargetHandler.java
public class TargetHandler extends ChannelInboundHandlerAdapter {
private final Channel inChannel;
public TargetHandler(Channel inChannel) {
this.inChannel = inChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse ) {
//record response
} else if (msg instanceof DefaultHttpResponse) {
// convert to FullHttpResponse ?
}
inChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
});
}
...
}
源初始化器管道中的 HttpObjectAggregator
没有问题,并且允许我记录 FullHttpRequest。如果我在 TargetTransportHandlerInitializer
class 中评论 pipeLine.addLast(new HttpObjectAggregator(1048576));
,则代理可以完美运行。如果我删除该评论,流式调用将开始失败。如果没有聚合器,msg
object 只是 HTTP/1 调用的 DefaultHttpResponse,我无法访问 body 和 headers.
我是否需要某种流式传输避开聚合器的条件管道?或者是否有其他方法可以从 DefaultHttpResponse
?
构建 FullHttpResponse
我真的希望我能做到这一点 FullHttpResponse full = HttpObjectAggregator.aggregate(default)
就此结束。
Netty 的 HTTP 编解码器以两种不同的方式工作:
- 聚合:使用您提到的对象聚合器,它会生成
FullHttpRequest
/FullHttpResponse
个对象。
- 非聚合(流式):它为头、有效负载和尾部生成不同的消息,它们是
DefaultHttpRequest/Response
、HttpContent
和 LastHttpContent
。
转换消息的惯用 netty 方法是在管道中添加一个处理程序,因此您不会有像 FullHttpResponse full = HttpObjectAggregator.aggregate(default)
这样的 API,而是您将在 HttpObjectAggregator
之后插入一个处理程序并接收FullHttpResponse
对象。
以上是一个 either/or 选择,因此如果您添加聚合器,聚合器之后的处理程序将只能看到聚合消息,而在聚合器之前,他们将看到我上面提到的不同消息。
以下示例演示了如何处理流式消息:
我能够通过对 Netty 项目中的 PortUnification 示例进行轻微修改来完成此操作。
我用以下内容更新了 TargetTransportHandlerInitializer:
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeLine = socketChannel.pipeline();
pipeLine.addLast("codec", new HttpClientCodec(102400, 102400, 102400));
pipeLine.addLast("switch", new SwitchHandler(inChannel));
}
SwitchHandler
是一个 ChannelInboundHandlerAdapter
,具有以下内容:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (recordable(msg))
createRecordingChannel(ctx);
else
createStreamingChannel(ctx);
ctx.fireChannelRead(msg);
}
private void createRecordingChannel(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("aggregator", new HttpObjectAggregator(1048576));
p.addLast("recordingHandler", new RecordingTargetHandler(inChannel));
p.remove(this);
}
private void createStreamingChannel(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("simpleHandler", new SimpleTargetHandler(inChannel));
p.remove(this);
}
recordable()
包含一些业务逻辑来查看消息并做出决定。在我的例子中是在看 Headers.
我的两个新处理程序看起来像原来的 TargetHandler,具有不同的 channelRead
方法。
SimpleTargetHandler
将 ChannelInboundHandlerAdapter
扩展为
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
inChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
});
}
RecordingTargetHandler
扩展 SimpleTargetHandler
为:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse ) {
// record response
}
super.channelRead(ctx, msg);
}
我正在尝试使用 Netty 构建代理服务,以支持流式调用和 HTTP/1 流量。
我已经成功地做到了这一点,但是当我尝试向管道中添加 HttpObjectAggregator 时 运行 遇到了问题。我需要聚合器来构建用于报告的 FullHttpResponse。
我当前的设置使用 2 个 ChannelInitializers 和 2 个业务逻辑处理程序
启动代理服务:
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SourceTransportHandlerInitializer())
.childOption(ChannelOption.AUTO_READ, false);
Channel channel;
try {
channel = serverBootstrap.bind(localPort).sync().channel();
channel.closeFuture().sync();
}catch (InterruptedException e){
// oh no
}
SourceTransportHandlerInitializer.java
public class SourceTransportHandlerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeLine = socketChannel.pipeline();
pipeLine.addLast(new HttpServerCodec(102400,102400,102400));
pipeLine.addLast(new HttpObjectAggregator(1048576));
pipeLine.addLast(new SourceHandler());
pipeLine.addLast(new LoggingHandler(LogLevel.INFO));
}
}
SourceHandler.java
public class SourceHandler extends ChannelInboundHandlerAdapter {
private volatile Channel outboundChannel;
@Override
public void channelActive(ChannelHandlerContext context) {
final Channel inChannel = context.channel();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(inChannel.eventLoop())
.channel(context.channel().getClass())
.handler(new TargetTransportHandlerInitializer(inChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture channelFuture = bootstrap.connect(Constants.host, Constants.hostPort);
outboundChannel = channelFuture.channel();
channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
if (channelFuture1.isSuccess()) {
inChannel.read();
} else {
inChannel.close();
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpRequest) {
//record request
}
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
// flush and read data
ctx.channel().read();
} else {
channelFuture.channel().close();
}
});
} else {
LOG.debug("SourceHandler did not read. Outbound Channel not active");
}
}
...
}
TargetTransportHandlerInitializer.java
public class TargetTransportHandlerInitializer extends ChannelInitializer<SocketChannel>{
protected final Channel inChannel;
public TargetTransportHandlerInitializer (final Channel inChannel){
this.inChannel = inChannel;
}
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeLine = socketChannel.pipeline();
pipeLine.addLast("codec", new HttpClientCodec(102400, 102400, 102400));
//pipeLine.addLast(new HttpObjectAggregator(1048576));
pipeLine.addLast(new TargetHandler(inChannel));
}
}
TargetHandler.java
public class TargetHandler extends ChannelInboundHandlerAdapter {
private final Channel inChannel;
public TargetHandler(Channel inChannel) {
this.inChannel = inChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse ) {
//record response
} else if (msg instanceof DefaultHttpResponse) {
// convert to FullHttpResponse ?
}
inChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
});
}
...
}
源初始化器管道中的 HttpObjectAggregator
没有问题,并且允许我记录 FullHttpRequest。如果我在 TargetTransportHandlerInitializer
class 中评论 pipeLine.addLast(new HttpObjectAggregator(1048576));
,则代理可以完美运行。如果我删除该评论,流式调用将开始失败。如果没有聚合器,msg
object 只是 HTTP/1 调用的 DefaultHttpResponse,我无法访问 body 和 headers.
我是否需要某种流式传输避开聚合器的条件管道?或者是否有其他方法可以从 DefaultHttpResponse
?
FullHttpResponse
我真的希望我能做到这一点 FullHttpResponse full = HttpObjectAggregator.aggregate(default)
就此结束。
Netty 的 HTTP 编解码器以两种不同的方式工作:
- 聚合:使用您提到的对象聚合器,它会生成
FullHttpRequest
/FullHttpResponse
个对象。 - 非聚合(流式):它为头、有效负载和尾部生成不同的消息,它们是
DefaultHttpRequest/Response
、HttpContent
和LastHttpContent
。
转换消息的惯用 netty 方法是在管道中添加一个处理程序,因此您不会有像 FullHttpResponse full = HttpObjectAggregator.aggregate(default)
这样的 API,而是您将在 HttpObjectAggregator
之后插入一个处理程序并接收FullHttpResponse
对象。
以上是一个 either/or 选择,因此如果您添加聚合器,聚合器之后的处理程序将只能看到聚合消息,而在聚合器之前,他们将看到我上面提到的不同消息。
以下示例演示了如何处理流式消息:
我能够通过对 Netty 项目中的 PortUnification 示例进行轻微修改来完成此操作。
我用以下内容更新了 TargetTransportHandlerInitializer:
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeLine = socketChannel.pipeline();
pipeLine.addLast("codec", new HttpClientCodec(102400, 102400, 102400));
pipeLine.addLast("switch", new SwitchHandler(inChannel));
}
SwitchHandler
是一个 ChannelInboundHandlerAdapter
,具有以下内容:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (recordable(msg))
createRecordingChannel(ctx);
else
createStreamingChannel(ctx);
ctx.fireChannelRead(msg);
}
private void createRecordingChannel(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("aggregator", new HttpObjectAggregator(1048576));
p.addLast("recordingHandler", new RecordingTargetHandler(inChannel));
p.remove(this);
}
private void createStreamingChannel(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("simpleHandler", new SimpleTargetHandler(inChannel));
p.remove(this);
}
recordable()
包含一些业务逻辑来查看消息并做出决定。在我的例子中是在看 Headers.
我的两个新处理程序看起来像原来的 TargetHandler,具有不同的 channelRead
方法。
SimpleTargetHandler
将 ChannelInboundHandlerAdapter
扩展为
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
inChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
});
}
RecordingTargetHandler
扩展 SimpleTargetHandler
为:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse ) {
// record response
}
super.channelRead(ctx, msg);
}