如何使用 vert.x-rx 创建响应式客户端-服务器 TCP 通信

How to use vert.x-rx to create reactive client-server TCP communication

我目前正在从事一个项目,该项目需要在外部系统和我将编写的应用程序之间进行 TCP 通信(在 Java 中)。众所周知,这可以使用常规 NIO 轻松实现。但是,作为我正在进行的这个新项目的一部分,我必须使用 Vert.x 来提供 TCP 通信。请参考下图:

在右侧,我的应用程序作为 TCP 服务器运行,在左侧等待来自外部系统的连接。我读过创建一个 TCP 并监听连接你简单地做这样的事情:

NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
  if (res.succeeded()) {
    System.out.println("Server is now listening!");
  } else {
    System.out.println("Failed to bind!");
  }
});

但是,我无法弄清楚的一点是当外部系统连接到我的应用程序并通过 TCP 发送 EchoRequestMessages 时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为 EchoRequestMessage POJO,然后将 EchoResponseMessage 编码为字节缓冲区以发送回外部系统。

我如何使用 vert.x-rx 对 EchoRequestMessage 的接收、它的解码、EchoResponseMessage 的编码执行响应式编程,然后将其发送回外部系统,所有这些都在一个构建器中模式类型设置。我读过有关 Observables 和订阅的内容,但我不知道要观察什么或订阅什么。任何帮助将不胜感激。

要从套接字读取数据,您可以使用 RecordParser。在套接字连接上,数据通常由换行符分隔:

RecordParser parser = RecordParser.newDelimited("\n", sock);

A RecordParser 是 Vert.x ReadStream 所以它可以转换成 Flowable:

FlowableHelper.toFlowable(parser)

现在如果 EchoRequestMessage 可以从 Buffer 创建:

public class EchoRequestMessage {
  private String message;

  public static EchoRequestMessage fromBuffer(Buffer buffer) {
    // Deserialize
  }

  public String getMessage() {
    return message;
   }
 }

然后 EchoResponseMessage 转换为 Buffer:

public class EchoResponseMessage {
  private final String message;

  public EchoResponseMessage(String message) {
    this.message = message;
  }

  public Buffer toBuffer() {
    // Serialize;
  }
}

您可以使用 RxJava 运算符来实现回显服务器流:

vertx.createNetServer().connectHandler(sock -> {

  RecordParser parser = RecordParser.newDelimited("\n", sock);

  FlowableHelper.toFlowable(parser)
    .map(EchoRequestMessage::fromBuffer)
    .map(echoRequestMessage -> {
      return new EchoResponseMessage(echoRequestMessage.getMessage());
    })
    .subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
      throwable.printStackTrace();
      sock.close();
    }, sock::close);

}).listen(1234);

[编辑] 如果在您的协议消息中不是行分隔而是长度前缀,那么您可以创建自定义 ReadStream:

class LengthPrefixedStream implements ReadStream<Buffer> {
  final RecordParser recordParser;
  boolean prefix = false;

  private LengthPrefixedStream(ReadStream<Buffer> stream) {
    recordParser = RecordParser.newFixed(4, stream);
  }

  @Override
  public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
    recordParser.exceptionHandler(handler);
    return this;
  }

  @Override
  public ReadStream<Buffer> handler(Handler<Buffer> handler) {
    if (handler == null) {
      recordParser.handler(null);
      return this;
    }
    recordParser.handler(buffer -> {
      if (prefix) {
        prefix = false;
        recordParser.fixedSizeMode(buffer.getInt(0));
      } else {
        prefix = true;
        recordParser.fixedSizeMode(4);
        handler.handle(buffer);
      }
    });
    return this;
  }

  @Override
  public ReadStream<Buffer> pause() {
    recordParser.pause();
    return this;
  }

  @Override
  public ReadStream<Buffer> resume() {
    recordParser.resume();
    return this;
  }

  @Override
  public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
    recordParser.endHandler(endHandler);
    return this;
  }
}

并将其转换为 Flowable:

FlowableHelper.toFlowable(new LengthPrefixedStream(sock))

我发现使用 RecordParser.fixedSizeMode() 方法将传入数据包聚合到一条消息相当复杂。

我用RecordParser.newDelimited。 如果您可以操纵发送到您的 TCP 接收器的数据,那么您可以在其末尾添加一个唯一的定界符,而不是为每条发送的消息添加 4 字节消息长度前缀。

代码示例:(我没有使用 RxJava 语法)

发送中:

byte[] serialized = SerializationUtils.serialize(o);
senderSocket.write(Buffer.buffer(serialized).appendString(MSG_DELIMITER));
System.out.println("I sent some bytes: " + serialized.length);

阅读:

protected static String MSG_DELIMITER = "_I_Am_so0o_UNIQUEQE_";
protected Vertx vertx;
protected final RecordParser parser = RecordParser.newDelimited(MSG_DELIMITER, new Output());

public static main(String[] args) {
    this.vertx = Vertx.vertx();
    this.vertx.deployVerticle(new Receiver());
}

public class Receiver extends AbstractVerticle {
    @Override
    public void start() {

        NetServer receiver = this.vertx.createNetServer();

        receiver.connectHandler(socket ->

                socket.handler(parser::handle));

        receiver.listen(port, host, res -> {
            if (res.succeeded()) {
                System.out.println("Receiver is now listening!");
            } else {
                System.out.println("Failed to bind!");
            }
        });
    }

    @Override
    public void stop() {
        System.out.println("Receiver stopped");
    }
}

public class Output implements Handler<Buffer> {

    @Override
    public void handle(Buffer event) {
        try {
            E deserialized = (E) SerializationUtils.deserialize(event.getBytes());
            queue.put(deserialized);
            System.out.println("I received some bytes: " + event.length());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}