Netty Nio 中 promise 的异步更新

Asynchronous update of promise in Netty Nio

我有一个交换信息的服务器和客户端架构。我想return 从服务器连接的频道数。我想 return 使用 promise 将服务器的消息发送给客户端。我的代码是:

public static void callBack () throws Exception{

   String host = "localhost";
   int port = 8080;

   try {
       Bootstrap b = new Bootstrap();
       b.group(workerGroup);
       b.channel(NioSocketChannel.class);
       b.option(ChannelOption.SO_KEEPALIVE, true);
       b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
           public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
           }
       });
       ChannelFuture f = b.connect(host, port).sync();
       //f.channel().closeFuture().sync();
   }
   finally {
    //workerGroup.shutdownGracefully();
   }
}

public static void main(String[] args) throws Exception {

  callBack();
  while (true) {

    Object msg = promise.get();
    System.out.println("The number if the connected clients is not two");
    int ret = Integer.parseInt(msg.toString());
    if (ret == 2){
        break;
    }
  }
  System.out.println("The number if the connected clients is two");
}

当我 运行 一个客户端时,它总是收到消息 The number if the connected clients is not two 并且 returning 号码始终是一个。当我 运行 第二个客户端时,它总是接收到 returning 值二,但是,第一个客户端仍然接收到一个。对于第一个客户的情况,我找不到更新承诺的正确方法。

编辑: 客户端服务器:

public class ClientHandler extends ChannelInboundHandlerAdapter {
  public final Promise<Object> promise;
  public ClientHandler(Promise<Object> promise) {
      this.promise = promise;
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      RequestData msg = new RequestData();
      msg.setIntValue(123);
      msg.setStringValue("all work and no play makes jack a dull boy");
      ctx.writeAndFlush(msg);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      System.out.println(msg);
      promise.trySuccess(msg);
  }
} 

客户端处理程序的代码将从服务器接收到的消息存储到 promise。

使用Netty框架,Promise and a Future都是一次性写入的对象,这个原理使得它们在多线程环境下更容易使用。

既然一个Promise没有做你想要的,我们需要看看其他技术是否适合你的条件,你的条件基本上归结为:

  • 从多个线程读取
  • 仅从单个线程写入(因为在 Netty 通道内,read 方法只能同时由 1 个线程执行,除非通道被标记为可共享)

对于这些要求,最合适的匹配是 volatile 变量,因为这对于读取是线程安全的,并且可以由 1 个线程安全地更新而不用担心写入顺序。

要更新您的代码以使用 volatile 变量,需要进行一些修改,因为我们无法轻松地将引用 link 传递给函数内的变量,但我们必须传递一个更新后端的函数变量。

private static volatile int connectedClients = 0;
public static void callBack () throws Exception{
    //....
           ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
                                 new ClientHandler(i -> {connectedClients = i;});
    //....
}

public static void main(String[] args) throws Exception {

  callBack();
  while (true) {
    System.out.println("The number if the connected clients is not two");
    int ret = connectedClients;
    if (ret == 2){
        break;
    }
  }
  System.out.println("The number if the connected clients is two");
}

public class ClientHandler extends ChannelInboundHandlerAdapter {
  public final IntConsumer update;
  public ClientHandler(IntConsumer update) {
      this.update = update;
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      RequestData msg = new RequestData();
      msg.setIntValue(123);
      msg.setStringValue("all work and no play makes jack a dull boy");
      ctx.writeAndFlush(msg);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      System.out.println(msg);
      update.accept(Integer.parseInt(msg));
  }
} 

虽然上面的方法应该可行,但我们很快就会发现 class 主程序中的 while 循环使用了大量 CPU 时间,这可能会影响本地客户端系统的其他部分,幸运的是,如果我们在系统中添加其他部分,即同步,这个问题也是可以解决的。通过将 connectedClients 的初始读取留在同步块之外,我们仍然可以从 "true" 情况下的快速读取中获益,并且在“假”情况下,我们可以安全重要的 CPU 循环,可用于系统的其他部分。

为了解决这个问题,我们在阅读时使用以下步骤:

  1. connectedClients 的值存储在一个单独的变量中
  2. 将此变量与目标值进行比较
  3. 如果为真,则尽早跳出循环
  4. 如果为 false,则进入同步块
  5. 开始 while true 循环
  6. 再次读出变量,因为现在值可能会改变
  7. 检查条件,如果条件正确则中断
  8. 如果不是,等待值的变化

并且写的时候如下:

  1. 同步
  2. 更新值
  3. 唤醒等待该值的所有其他线程

可以用如下代码实现:

private static volatile int connectedClients = 0;
private static final Object lock = new Object();
public static void callBack () throws Exception{
    //....
           ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
                                 new ClientHandler(i -> {
               synchronized (lock) {
                   connectedClients = i;
                   lock.notifyAll();
               }
           });
    //....
}

public static void main(String[] args) throws Exception {

  callBack();
  int connected = connectedClients;
  if (connected != 2) {
      System.out.println("The number if the connected clients is not two before locking");
      synchronized (lock) {
          while (true) {
              connected = connectedClients;
              if (connected == 2)
                  break;
              System.out.println("The number if the connected clients is not two");
              lock.wait();
          }
      }
  }
  System.out.println("The number if the connected clients is two: " + connected );
}

服务器端变化

但是,并非所有问题都与客户端有关。

因为您将 link 发布到您的 github 存储库,所以当新人加入时,您永远不会将请求从服务器发送回旧客户端。由于未完成此操作,因此永远不会通知客户端更改,请确保也执行此操作。