如何使用 vert.x-rx 创建响应式客户端-服务器 TCP 通信
How to use vert.x-rx to create reactive client-server TCP communication
我目前正在从事一个项目,该项目需要在外部系统和我将编写的应用程序之间进行 TCP 通信(在 Java 中)。众所周知,这可以使用常规 NIO 轻松实现。但是,作为我正在进行的这个新项目的一部分,我必须使用 Vert.x 来提供 TCP 通信。请参考下图:
在右侧,我的应用程序作为 TCP 服务器运行,在左侧等待来自外部系统的连接。我读过创建一个 TCP 并监听连接你简单地做这样的事情:
NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
但是,我无法弄清楚的一点是当外部系统连接到我的应用程序并通过 TCP 发送 EchoRequestMessages 时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为 EchoRequestMessage POJO,然后将 EchoResponseMessage 编码为字节缓冲区以发送回外部系统。
我如何使用 vert.x-rx 对 EchoRequestMessage 的接收、它的解码、EchoResponseMessage 的编码执行响应式编程,然后将其发送回外部系统,所有这些都在一个构建器中模式类型设置。我读过有关 Observables 和订阅的内容,但我不知道要观察什么或订阅什么。任何帮助将不胜感激。
要从套接字读取数据,您可以使用 RecordParser
。在套接字连接上,数据通常由换行符分隔:
RecordParser parser = RecordParser.newDelimited("\n", sock);
A RecordParser
是 Vert.x ReadStream
所以它可以转换成 Flowable
:
FlowableHelper.toFlowable(parser)
现在如果 EchoRequestMessage
可以从 Buffer
创建:
public class EchoRequestMessage {
private String message;
public static EchoRequestMessage fromBuffer(Buffer buffer) {
// Deserialize
}
public String getMessage() {
return message;
}
}
然后 EchoResponseMessage
转换为 Buffer
:
public class EchoResponseMessage {
private final String message;
public EchoResponseMessage(String message) {
this.message = message;
}
public Buffer toBuffer() {
// Serialize;
}
}
您可以使用 RxJava 运算符来实现回显服务器流:
vertx.createNetServer().connectHandler(sock -> {
RecordParser parser = RecordParser.newDelimited("\n", sock);
FlowableHelper.toFlowable(parser)
.map(EchoRequestMessage::fromBuffer)
.map(echoRequestMessage -> {
return new EchoResponseMessage(echoRequestMessage.getMessage());
})
.subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
throwable.printStackTrace();
sock.close();
}, sock::close);
}).listen(1234);
[编辑] 如果在您的协议消息中不是行分隔而是长度前缀,那么您可以创建自定义 ReadStream
:
class LengthPrefixedStream implements ReadStream<Buffer> {
final RecordParser recordParser;
boolean prefix = false;
private LengthPrefixedStream(ReadStream<Buffer> stream) {
recordParser = RecordParser.newFixed(4, stream);
}
@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}
@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
if (handler == null) {
recordParser.handler(null);
return this;
}
recordParser.handler(buffer -> {
if (prefix) {
prefix = false;
recordParser.fixedSizeMode(buffer.getInt(0));
} else {
prefix = true;
recordParser.fixedSizeMode(4);
handler.handle(buffer);
}
});
return this;
}
@Override
public ReadStream<Buffer> pause() {
recordParser.pause();
return this;
}
@Override
public ReadStream<Buffer> resume() {
recordParser.resume();
return this;
}
@Override
public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
recordParser.endHandler(endHandler);
return this;
}
}
并将其转换为 Flowable
:
FlowableHelper.toFlowable(new LengthPrefixedStream(sock))
我发现使用 RecordParser.fixedSizeMode() 方法将传入数据包聚合到一条消息相当复杂。
我用RecordParser.newDelimited。
如果您可以操纵发送到您的 TCP 接收器的数据,那么您可以在其末尾添加一个唯一的定界符,而不是为每条发送的消息添加 4 字节消息长度前缀。
代码示例:(我没有使用 RxJava 语法)
发送中:
byte[] serialized = SerializationUtils.serialize(o);
senderSocket.write(Buffer.buffer(serialized).appendString(MSG_DELIMITER));
System.out.println("I sent some bytes: " + serialized.length);
阅读:
protected static String MSG_DELIMITER = "_I_Am_so0o_UNIQUEQE_";
protected Vertx vertx;
protected final RecordParser parser = RecordParser.newDelimited(MSG_DELIMITER, new Output());
public static main(String[] args) {
this.vertx = Vertx.vertx();
this.vertx.deployVerticle(new Receiver());
}
public class Receiver extends AbstractVerticle {
@Override
public void start() {
NetServer receiver = this.vertx.createNetServer();
receiver.connectHandler(socket ->
socket.handler(parser::handle));
receiver.listen(port, host, res -> {
if (res.succeeded()) {
System.out.println("Receiver is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
}
@Override
public void stop() {
System.out.println("Receiver stopped");
}
}
public class Output implements Handler<Buffer> {
@Override
public void handle(Buffer event) {
try {
E deserialized = (E) SerializationUtils.deserialize(event.getBytes());
queue.put(deserialized);
System.out.println("I received some bytes: " + event.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我目前正在从事一个项目,该项目需要在外部系统和我将编写的应用程序之间进行 TCP 通信(在 Java 中)。众所周知,这可以使用常规 NIO 轻松实现。但是,作为我正在进行的这个新项目的一部分,我必须使用 Vert.x 来提供 TCP 通信。请参考下图:
在右侧,我的应用程序作为 TCP 服务器运行,在左侧等待来自外部系统的连接。我读过创建一个 TCP 并监听连接你简单地做这样的事情:
NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
但是,我无法弄清楚的一点是当外部系统连接到我的应用程序并通过 TCP 发送 EchoRequestMessages 时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为 EchoRequestMessage POJO,然后将 EchoResponseMessage 编码为字节缓冲区以发送回外部系统。
我如何使用 vert.x-rx 对 EchoRequestMessage 的接收、它的解码、EchoResponseMessage 的编码执行响应式编程,然后将其发送回外部系统,所有这些都在一个构建器中模式类型设置。我读过有关 Observables 和订阅的内容,但我不知道要观察什么或订阅什么。任何帮助将不胜感激。
要从套接字读取数据,您可以使用 RecordParser
。在套接字连接上,数据通常由换行符分隔:
RecordParser parser = RecordParser.newDelimited("\n", sock);
A RecordParser
是 Vert.x ReadStream
所以它可以转换成 Flowable
:
FlowableHelper.toFlowable(parser)
现在如果 EchoRequestMessage
可以从 Buffer
创建:
public class EchoRequestMessage {
private String message;
public static EchoRequestMessage fromBuffer(Buffer buffer) {
// Deserialize
}
public String getMessage() {
return message;
}
}
然后 EchoResponseMessage
转换为 Buffer
:
public class EchoResponseMessage {
private final String message;
public EchoResponseMessage(String message) {
this.message = message;
}
public Buffer toBuffer() {
// Serialize;
}
}
您可以使用 RxJava 运算符来实现回显服务器流:
vertx.createNetServer().connectHandler(sock -> {
RecordParser parser = RecordParser.newDelimited("\n", sock);
FlowableHelper.toFlowable(parser)
.map(EchoRequestMessage::fromBuffer)
.map(echoRequestMessage -> {
return new EchoResponseMessage(echoRequestMessage.getMessage());
})
.subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
throwable.printStackTrace();
sock.close();
}, sock::close);
}).listen(1234);
[编辑] 如果在您的协议消息中不是行分隔而是长度前缀,那么您可以创建自定义 ReadStream
:
class LengthPrefixedStream implements ReadStream<Buffer> {
final RecordParser recordParser;
boolean prefix = false;
private LengthPrefixedStream(ReadStream<Buffer> stream) {
recordParser = RecordParser.newFixed(4, stream);
}
@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}
@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
if (handler == null) {
recordParser.handler(null);
return this;
}
recordParser.handler(buffer -> {
if (prefix) {
prefix = false;
recordParser.fixedSizeMode(buffer.getInt(0));
} else {
prefix = true;
recordParser.fixedSizeMode(4);
handler.handle(buffer);
}
});
return this;
}
@Override
public ReadStream<Buffer> pause() {
recordParser.pause();
return this;
}
@Override
public ReadStream<Buffer> resume() {
recordParser.resume();
return this;
}
@Override
public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
recordParser.endHandler(endHandler);
return this;
}
}
并将其转换为 Flowable
:
FlowableHelper.toFlowable(new LengthPrefixedStream(sock))
我发现使用 RecordParser.fixedSizeMode() 方法将传入数据包聚合到一条消息相当复杂。
我用RecordParser.newDelimited。 如果您可以操纵发送到您的 TCP 接收器的数据,那么您可以在其末尾添加一个唯一的定界符,而不是为每条发送的消息添加 4 字节消息长度前缀。
代码示例:(我没有使用 RxJava 语法)
发送中:
byte[] serialized = SerializationUtils.serialize(o);
senderSocket.write(Buffer.buffer(serialized).appendString(MSG_DELIMITER));
System.out.println("I sent some bytes: " + serialized.length);
阅读:
protected static String MSG_DELIMITER = "_I_Am_so0o_UNIQUEQE_";
protected Vertx vertx;
protected final RecordParser parser = RecordParser.newDelimited(MSG_DELIMITER, new Output());
public static main(String[] args) {
this.vertx = Vertx.vertx();
this.vertx.deployVerticle(new Receiver());
}
public class Receiver extends AbstractVerticle {
@Override
public void start() {
NetServer receiver = this.vertx.createNetServer();
receiver.connectHandler(socket ->
socket.handler(parser::handle));
receiver.listen(port, host, res -> {
if (res.succeeded()) {
System.out.println("Receiver is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
}
@Override
public void stop() {
System.out.println("Receiver stopped");
}
}
public class Output implements Handler<Buffer> {
@Override
public void handle(Buffer event) {
try {
E deserialized = (E) SerializationUtils.deserialize(event.getBytes());
queue.put(deserialized);
System.out.println("I received some bytes: " + event.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}