支持流式传输和 http 缓存的 Netty 代理管道

Netty Proxy pipeline to support streaming and http caching

我正在尝试使用 Netty 构建代理服务,以支持流式调用和 HTTP/1 流量。

我已经成功地做到了这一点,但是当我尝试向管道中添加 HttpObjectAggregator 时 运行 遇到了问题。我需要聚合器来构建用于报告的 FullHttpResponse。

我当前的设置使用 2 个 ChannelInitializers 和 2 个业务逻辑处理程序

启动代理服务:

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup)
    .channel(NioServerSocketChannel.class)
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new SourceTransportHandlerInitializer())
    .childOption(ChannelOption.AUTO_READ, false);

Channel channel;
try {
    channel = serverBootstrap.bind(localPort).sync().channel();
    channel.closeFuture().sync();
}catch (InterruptedException e){
    // oh no
}

SourceTransportHandlerInitializer.java

public class SourceTransportHandlerInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        ChannelPipeline pipeLine = socketChannel.pipeline();
        pipeLine.addLast(new HttpServerCodec(102400,102400,102400));
        pipeLine.addLast(new HttpObjectAggregator(1048576));
        pipeLine.addLast(new SourceHandler());
        pipeLine.addLast(new LoggingHandler(LogLevel.INFO));
    }

}

SourceHandler.java

public class SourceHandler extends ChannelInboundHandlerAdapter {

    private volatile Channel outboundChannel;

    @Override
    public void channelActive(ChannelHandlerContext context) {
        final Channel inChannel = context.channel();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(inChannel.eventLoop())
                .channel(context.channel().getClass())
                .handler(new TargetTransportHandlerInitializer(inChannel)) 
                .option(ChannelOption.AUTO_READ, false);
        ChannelFuture channelFuture = bootstrap.connect(Constants.host, Constants.hostPort);
        outboundChannel = channelFuture.channel();

        channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
            if (channelFuture1.isSuccess()) {
                inChannel.read();
            } else {
                inChannel.close();
            }
        });
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {

        if (msg instanceof FullHttpRequest) {
            //record request
        }

        if (outboundChannel.isActive()) {
            outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) channelFuture -> {
                if (channelFuture.isSuccess()) {
                    // flush and read data
                    ctx.channel().read();
                } else {
                    channelFuture.channel().close();
                }
            });
        } else {
            LOG.debug("SourceHandler did not read. Outbound Channel not active");
        }
    }

...

}

TargetTransportHandlerInitializer.java

public class TargetTransportHandlerInitializer extends ChannelInitializer<SocketChannel>{

    protected final Channel inChannel;

    public TargetTransportHandlerInitializer (final Channel inChannel){
        this.inChannel = inChannel;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        ChannelPipeline pipeLine = socketChannel.pipeline();

        pipeLine.addLast("codec", new HttpClientCodec(102400, 102400, 102400)); 
        //pipeLine.addLast(new HttpObjectAggregator(1048576));
        pipeLine.addLast(new TargetHandler(inChannel));
    }

}

TargetHandler.java

public class TargetHandler extends ChannelInboundHandlerAdapter {

    private final Channel inChannel;

    public TargetHandler(Channel inChannel) {
        this.inChannel = inChannel;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.read();
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {

        if (msg instanceof FullHttpResponse ) {
            //record response
        } else if (msg instanceof DefaultHttpResponse) {
            // convert to FullHttpResponse ?
        }

        inChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        });
    }

...

}

源初始化器管道中的 HttpObjectAggregator 没有问题,并且允许我记录 FullHttpRequest。如果我在 TargetTransportHandlerInitializer class 中评论 pipeLine.addLast(new HttpObjectAggregator(1048576));,则代理可以完美运行。如果我删除该评论,流式调用将开始失败。如果没有聚合器,msg object 只是 HTTP/1 调用的 DefaultHttpResponse,我无法访问 body 和 headers.

我是否需要某种流式传输避开聚合器的条件管道?或者是否有其他方法可以从 DefaultHttpResponse?

构建 FullHttpResponse

我真的希望我能做到这一点 FullHttpResponse full = HttpObjectAggregator.aggregate(default) 就此结束。

Netty 的 HTTP 编解码器以两种不同的方式工作:

  • 聚合:使用您提到的对象聚合器,它会生成 FullHttpRequest/FullHttpResponse 个对象。
  • 非聚合(流式):它为头、有效负载和尾部生成不同的消息,它们是 DefaultHttpRequest/ResponseHttpContentLastHttpContent

转换消息的惯用 netty 方法是在管道中添加一个处理程序,因此您不会有像 FullHttpResponse full = HttpObjectAggregator.aggregate(default) 这样的 API,而是您将在 HttpObjectAggregator 之后插入一个处理程序并接收FullHttpResponse 对象。

以上是一个 either/or 选择,因此如果您添加聚合器,聚合器之后的处理程序将只能看到聚合消息,而在聚合器之前,他们将看到我上面提到的不同消息。

以下示例演示了如何处理流式消息:

https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientHandler.java#L30

我能够通过对 Netty 项目中的 PortUnification 示例进行轻微修改来完成此操作。

我用以下内容更新了 TargetTransportHandlerInitializer:

@Override
protected void initChannel(SocketChannel socketChannel) {
    ChannelPipeline pipeLine = socketChannel.pipeline();

    pipeLine.addLast("codec", new HttpClientCodec(102400, 102400, 102400));
    pipeLine.addLast("switch", new SwitchHandler(inChannel));
    
}

SwitchHandler 是一个 ChannelInboundHandlerAdapter,具有以下内容:

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {

    if (recordable(msg))
        createRecordingChannel(ctx);
    else
        createStreamingChannel(ctx);

    ctx.fireChannelRead(msg);
}

private void createRecordingChannel(ChannelHandlerContext ctx) {
    ChannelPipeline p = ctx.pipeline();
    p.addLast("aggregator", new HttpObjectAggregator(1048576));
    p.addLast("recordingHandler", new RecordingTargetHandler(inChannel));
    p.remove(this);
}

private void createStreamingChannel(ChannelHandlerContext ctx) {
    ChannelPipeline p = ctx.pipeline();
    p.addLast("simpleHandler", new SimpleTargetHandler(inChannel));
    p.remove(this);
}

recordable() 包含一些业务逻辑来查看消息并做出决定。在我的例子中是在看 Headers.

我的两个新处理程序看起来像原来的 TargetHandler,具有不同的 channelRead 方法。

SimpleTargetHandlerChannelInboundHandlerAdapter 扩展为

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {

    inChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
        if (future.isSuccess()) {
            ctx.channel().read();
        } else {
            future.channel().close();
        }
    });
}

RecordingTargetHandler 扩展 SimpleTargetHandler 为:

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {

    if (msg instanceof FullHttpResponse ) {
        // record response
    }

    super.channelRead(ctx, msg);
}