使用 Netty 限制 HTTP/2 中的流读取

Throttling reads on streams in HTTP/2 with Netty

在 Netty 4 proxy example AUTO_READ is turned off, which according to 中,通道向代理发送大量数据不会占用太多缓冲区,这是有道理的。

但是,我的理解是,在HTTP/2中,您永远不应该延迟读取,而是逐个流地处理流控制。不过我不知道如何在 Netty 中执行此操作。

感觉应该是在Http2ConnectionHandler上返回onDataRead中的0来实现。以下人为设计的示例是我预期它的工作方式,其中字节未消耗 100 毫秒:

@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
                      int padding, boolean endOfStream) throws Http2Exception {
    int processed = data.readableBytes() + padding;

    Http2LocalFlowController flowController = decoder().flowController();
    Http2Stream stream = connection().stream(streamId);
    executorService.schedule(() -> {
        ctx.executor().execute(() -> {
            try {
                // ...do stuff...
                flowController.consumeBytes(stream, processed);
            } catch (Http2Exception e) {
                e.printStackTrace(); // now what?
            }
        });
    }, 100, TimeUnit.MILLISECONDS);

    return 0;
}

当我使用它时,没有抛出异常,客户端只是超时。 (请注意,如果我在从 onDataRead 返回之前调用 consumeBytes 调用,那么它确实有效。)

通过一些额外的日志记录,在读取 16k 条消息时,我可以立即看到 65535 字节未使用。开始消费后,据说32k后发送了一个window帧。在第 4 条消息之后,flowController.unconsumedBytes(stream) 报告 0 个字节未使用,但是下一条消息永远不会到达 onDataRead

这显然适用于其他人,所以我正在努力寻找问题所在。

consumeBytes 方法 returns 一个布尔值,指示是否达到阈值,意味着可以为流发送 window 更新(即表示流可以发送更多数据)。

该方法对 window 更新帧执行 ctx.write,但不刷新它。所以如果没有刷新上下文,你需要这样做:

if (flowController.consumeBytes(stream, processed)) {
    ctx.flush();
}