使用 Netty HTTP 客户端重试请求
Retry requests with Netty HTTP client
如何在基于 Netty 的 HTTP 客户端中重试 HTTP 请求?
考虑以下处理程序,如果收到 HTTP 响应代码 503,它会在 1 秒后尝试重试 HTTP 请求:
public class RetryChannelHandler extends ChannelDuplexHandler {
List<HttpObject> requestParts;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof HttpRequest) {
requestParts = new ArrayList<>();
requestParts.add((HttpRequest)msg);
} else if (msg instanceof HttpObject) {
requestParts.add((HttpObject)msg);
}
super.write(ctx, msg, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse)msg;
if (res.status().code() == 503) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
for (HttpObject obj : requestParts) {
ctx.channel().write(obj);
}
}
}, 1000, TimeUnit.MILLISECONDS);
} else {
super.channelRead(ctx, msg);
}
} else {
super.channelRead(ctx, msg);
}
}
}
当我在此示例中写入通道时,管道中的其他处理程序看到 HttpObjects,但实际上并没有再次执行 HttpRequest——只收到一个 HttpResponse。
我认为我在这种情况下只是误用了 Channel,我需要创建一个新的 Channel(代表与服务器的新连接)来执行重试。我不清楚的是如何从处理程序的上下文中创建新的通道,以及我是否真的在正确的 Netty 层中执行这种逻辑。
任何关于如何实现我所描述的行为的指导都将不胜感激。
您还需要在调用 write(...)
之后调用 flush()
,否则它不会被刷新到通道中。此外,您还需要确保您可能保留()和重复()HttpContent
,否则您最终可能会尝试编写一个已经发布的 HttpContent
对象。
类似这样(未测试):
public class RetryChannelHandler extends ChannelDuplexHandler {
Queue<HttpObject> requestParts;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof HttpRequest) {
requestParts = new ArrayDeque<>();
requestParts.add((HttpRequest)msg);
} else if (msg instanceof HttpContent) {
requestParts.add(((HttpContent)msg).duplicate().retain());
}
super.write(ctx, msg, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse)msg;
if (res.status().code() == 503) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
HttpObject obj;
while ((obj = requestParts.poll()) != null) {
ctx.write(obj);
}
ctx.flush();
}
}, 1000, TimeUnit.MILLISECONDS);
} else {
HttpObject obj;
while ((obj = requestParts.poll()) != null) {
ReferenceCountUtil.release(obj);
}
super.channelRead(ctx, msg);
}
} else {
super.channelRead(ctx, msg);
}
}
}
如何在基于 Netty 的 HTTP 客户端中重试 HTTP 请求?
考虑以下处理程序,如果收到 HTTP 响应代码 503,它会在 1 秒后尝试重试 HTTP 请求:
public class RetryChannelHandler extends ChannelDuplexHandler {
List<HttpObject> requestParts;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof HttpRequest) {
requestParts = new ArrayList<>();
requestParts.add((HttpRequest)msg);
} else if (msg instanceof HttpObject) {
requestParts.add((HttpObject)msg);
}
super.write(ctx, msg, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse)msg;
if (res.status().code() == 503) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
for (HttpObject obj : requestParts) {
ctx.channel().write(obj);
}
}
}, 1000, TimeUnit.MILLISECONDS);
} else {
super.channelRead(ctx, msg);
}
} else {
super.channelRead(ctx, msg);
}
}
}
当我在此示例中写入通道时,管道中的其他处理程序看到 HttpObjects,但实际上并没有再次执行 HttpRequest——只收到一个 HttpResponse。
我认为我在这种情况下只是误用了 Channel,我需要创建一个新的 Channel(代表与服务器的新连接)来执行重试。我不清楚的是如何从处理程序的上下文中创建新的通道,以及我是否真的在正确的 Netty 层中执行这种逻辑。
任何关于如何实现我所描述的行为的指导都将不胜感激。
您还需要在调用 write(...)
之后调用 flush()
,否则它不会被刷新到通道中。此外,您还需要确保您可能保留()和重复()HttpContent
,否则您最终可能会尝试编写一个已经发布的 HttpContent
对象。
类似这样(未测试):
public class RetryChannelHandler extends ChannelDuplexHandler {
Queue<HttpObject> requestParts;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof HttpRequest) {
requestParts = new ArrayDeque<>();
requestParts.add((HttpRequest)msg);
} else if (msg instanceof HttpContent) {
requestParts.add(((HttpContent)msg).duplicate().retain());
}
super.write(ctx, msg, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse)msg;
if (res.status().code() == 503) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
HttpObject obj;
while ((obj = requestParts.poll()) != null) {
ctx.write(obj);
}
ctx.flush();
}
}, 1000, TimeUnit.MILLISECONDS);
} else {
HttpObject obj;
while ((obj = requestParts.poll()) != null) {
ReferenceCountUtil.release(obj);
}
super.channelRead(ctx, msg);
}
} else {
super.channelRead(ctx, msg);
}
}
}