使响应顺序与 Netty 中的请求顺序相匹配
Make order of responses match order of requests in Netty
上下文
我正在编写一个 Netty 应用程序 (Netty 4),其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个 EchoHandler
来响应原始消息,尽管有时会在短暂的延迟之后:
public class EchoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
if (message.equals("delay")) {
ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
}
}
}
问题
使用此代码,不保证响应的顺序与传入请求的顺序相同。事实上,如果第一条消息由字符串“delay”组成,而第二条消息由其他字符串组成,则响应的顺序将会颠倒!我写了一个测试来说明这一点:
public class Tests {
@Test
public void test() throws ExecutionException, InterruptedException {
var channel = new EmbeddedChannel(new EchoHandler());
channel.writeInbound("delay", "second message");
// Let some time pass and process any scheduled tasks
Thread.sleep(500);
channel.runPendingTasks();
// The response to the last message comes first!
var firstMessage = (String) channel.readOutbound();
Assertions.assertEquals("second message", firstMessage);
// The response to the first message comes second!
var secondMessage = (String) channel.readOutbound();
Assertions.assertEquals("delay", secondMessage);
}
}
问题
我正在 Netty 中寻找一种内置方式来保证传出响应的顺序与传入消息的顺序相匹配。使用库的假想扩展,我将重写处理程序如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
if (message.equals("delay")) {
ctx.executor().schedule(() -> {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}, 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}
}
Netty 是否提供开箱即用的类似功能?这听起来像是一个常见的用例,我宁愿依赖经过实战测试的代码,也不愿编写自己的队列实现。
虽然不是内置的,但这个问题看似惯用的解决方案是自定义 MessageToMessageCodec
。基于使用 String
作为消息类型的示例,我编写了以下编解码器来为您处理排队:
public class QueuingCodec extends MessageToMessageCodec<String, String> {
private final Queue<String> messageQueue = new ArrayDeque<>();
private boolean processingMessage = false;
@Override
protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception {
// Pass the message on to output
list.add(s);
// Allow processing more messages
processingMessage = false;
// Send the next message in the queue if available
if (!messageQueue.isEmpty()) {
processingMessage = true;
var message = messageQueue.poll();
// Pass the message on to input
ctx.executor().execute(() -> {
ctx.fireChannelRead(message);
});
}
}
@Override
protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception {
if (processingMessage) {
// Store message for later processing
messageQueue.add(s);
} else {
// Pass data on to input
list.add(s);
// Prevent processing of new messages
processingMessage = true;
}
}
}
上下文
我正在编写一个 Netty 应用程序 (Netty 4),其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个 EchoHandler
来响应原始消息,尽管有时会在短暂的延迟之后:
public class EchoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
if (message.equals("delay")) {
ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
}
}
}
问题
使用此代码,不保证响应的顺序与传入请求的顺序相同。事实上,如果第一条消息由字符串“delay”组成,而第二条消息由其他字符串组成,则响应的顺序将会颠倒!我写了一个测试来说明这一点:
public class Tests {
@Test
public void test() throws ExecutionException, InterruptedException {
var channel = new EmbeddedChannel(new EchoHandler());
channel.writeInbound("delay", "second message");
// Let some time pass and process any scheduled tasks
Thread.sleep(500);
channel.runPendingTasks();
// The response to the last message comes first!
var firstMessage = (String) channel.readOutbound();
Assertions.assertEquals("second message", firstMessage);
// The response to the first message comes second!
var secondMessage = (String) channel.readOutbound();
Assertions.assertEquals("delay", secondMessage);
}
}
问题
我正在 Netty 中寻找一种内置方式来保证传出响应的顺序与传入消息的顺序相匹配。使用库的假想扩展,我将重写处理程序如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
if (message.equals("delay")) {
ctx.executor().schedule(() -> {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}, 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}
}
Netty 是否提供开箱即用的类似功能?这听起来像是一个常见的用例,我宁愿依赖经过实战测试的代码,也不愿编写自己的队列实现。
虽然不是内置的,但这个问题看似惯用的解决方案是自定义 MessageToMessageCodec
。基于使用 String
作为消息类型的示例,我编写了以下编解码器来为您处理排队:
public class QueuingCodec extends MessageToMessageCodec<String, String> {
private final Queue<String> messageQueue = new ArrayDeque<>();
private boolean processingMessage = false;
@Override
protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception {
// Pass the message on to output
list.add(s);
// Allow processing more messages
processingMessage = false;
// Send the next message in the queue if available
if (!messageQueue.isEmpty()) {
processingMessage = true;
var message = messageQueue.poll();
// Pass the message on to input
ctx.executor().execute(() -> {
ctx.fireChannelRead(message);
});
}
}
@Override
protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception {
if (processingMessage) {
// Store message for later processing
messageQueue.add(s);
} else {
// Pass data on to input
list.add(s);
// Prevent processing of new messages
processingMessage = true;
}
}
}