使响应顺序与 Netty 中的请求顺序相匹配

Make order of responses match order of requests in Netty

上下文

我正在编写一个 Netty 应用程序 (Netty 4),其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个 EchoHandler 来响应原始消息,尽管有时会在短暂的延迟之后:

public class EchoHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    var message = (String) msg;

    if (message.equals("delay")) {
      ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
    } else {
      ctx.writeAndFlush(message);
    }
  }
}

问题

使用此代码,不保证响应的顺序与传入请求的顺序相同。事实上,如果第一条消息由字符串“delay”组成,而第二条消息由其他字符串组成,则响应的顺序将会颠倒!我写了一个测试来说明这一点:

public class Tests {
  @Test
  public void test() throws ExecutionException, InterruptedException {
    var channel = new EmbeddedChannel(new EchoHandler());
    channel.writeInbound("delay", "second message");

    // Let some time pass and process any scheduled tasks
    Thread.sleep(500);
    channel.runPendingTasks();

    // The response to the last message comes first!
    var firstMessage = (String) channel.readOutbound();
    Assertions.assertEquals("second message", firstMessage);

    // The response to the first message comes second!
    var secondMessage = (String) channel.readOutbound();
    Assertions.assertEquals("delay", secondMessage);
  }
}

问题

我正在 Netty 中寻找一种内置方式来保证传出响应的顺序与传入消息的顺序相匹配。使用库的假想扩展,我将重写处理程序如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
  var message = (String) msg;

  ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
  
  if (message.equals("delay")) {
    ctx.executor().schedule(() -> {
      ctx.writeAndFlush(message);
      ctx.resumeMessageProcessing(); // After flushing we can resume message processing
    }, 400, TimeUnit.MILLISECONDS);
  } else {
    ctx.writeAndFlush(message);
    ctx.resumeMessageProcessing(); // After flushing we can resume message processing
  }
}

Netty 是否提供开箱即用的类似功能?这听起来像是一个常见的用例,我宁愿依赖经过实战测试的代码,也不愿编写自己的队列实现。

虽然不是内置的,但这个问题看似惯用的解决方案是自定义 MessageToMessageCodec。基于使用 String 作为消息类型的示例,我编写了以下编解码器来为您处理排队:

public class QueuingCodec extends MessageToMessageCodec<String, String> {

  private final Queue<String> messageQueue = new ArrayDeque<>();
  private boolean processingMessage = false;

  @Override
  protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
      throws Exception {

    // Pass the message on to output
    list.add(s);

    // Allow processing more messages
    processingMessage = false;

    // Send the next message in the queue if available
    if (!messageQueue.isEmpty()) {
      processingMessage = true;
      var message = messageQueue.poll();

      // Pass the message on to input
      ctx.executor().execute(() -> {
        ctx.fireChannelRead(message);
      });
    }
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
      throws Exception {

    if (processingMessage) {
      // Store message for later processing
      messageQueue.add(s);
    } else {
      // Pass data on to input
      list.add(s);

      // Prevent processing of new messages
      processingMessage = true;
    }
  }
}