使用 Netty HTTP 客户端重试请求

Retry requests with Netty HTTP client

如何在基于 Netty 的 HTTP 客户端中重试 HTTP 请求?

考虑以下处理程序,如果收到 HTTP 响应代码 503,它会在 1 秒后尝试重试 HTTP 请求:

public class RetryChannelHandler extends ChannelDuplexHandler {
    List<HttpObject> requestParts;

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof HttpRequest) {
            requestParts = new ArrayList<>();
            requestParts.add((HttpRequest)msg);
        } else if (msg instanceof HttpObject) {
            requestParts.add((HttpObject)msg);
        }

        super.write(ctx, msg, promise);
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpResponse) {
            HttpResponse res = (HttpResponse)msg;
            if (res.status().code() == 503) {
                ctx.executor().schedule(new Runnable() {
                    @Override
                    public void run() {
                        for (HttpObject obj : requestParts) {
                            ctx.channel().write(obj);
                        }
                    }
                }, 1000, TimeUnit.MILLISECONDS);
            } else {
                super.channelRead(ctx, msg);
            }
        } else {
            super.channelRead(ctx, msg);
        }
    }
}

当我在此示例中写入通道时,管道中的其他处理程序看到 HttpObjects,但实际上并没有再次执行 HttpRequest——只收到一个 HttpResponse。

我认为我在这种情况下只是误用了 Channel,我需要创建一个新的 Channel(代表与服务器的新连接)来执行重试。我不清楚的是如何从处理程序的上下文中创建新的通道,以及我是否真的在正确的 Netty 层中执行这种逻辑。

任何关于如何实现我所描述的行为的指导都将不胜感激。

您还需要在调用 write(...) 之后调用 flush(),否则它不会被刷新到通道中。此外,您还需要确保您可能保留()和重复()HttpContent,否则您最终可能会尝试编写一个已经发布的 HttpContent 对象。

类似这样(未测试):

public class RetryChannelHandler extends ChannelDuplexHandler {
    Queue<HttpObject> requestParts;

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof HttpRequest) {
            requestParts = new ArrayDeque<>();
            requestParts.add((HttpRequest)msg);
        } else if (msg instanceof HttpContent) {
            requestParts.add(((HttpContent)msg).duplicate().retain());
        }

        super.write(ctx, msg, promise);
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpResponse) {
            HttpResponse res = (HttpResponse)msg;
            if (res.status().code() == 503) {
                ctx.executor().schedule(new Runnable() {
                    @Override
                    public void run() {
                        HttpObject obj;
                        while ((obj = requestParts.poll()) != null) {
                            ctx.write(obj);
                        }
                        ctx.flush();
                    }
                }, 1000, TimeUnit.MILLISECONDS);
            } else {
                HttpObject obj;
                while ((obj = requestParts.poll()) != null) {
                    ReferenceCountUtil.release(obj);
                }
                super.channelRead(ctx, msg);
            }
        } else {
            super.channelRead(ctx, msg);
        }
    }
}