如何在其他 class 中读取 netty 中的消息
How to read Message in netty in other class
我想阅读 class 中特定位置的消息,而不是 InboundHandler
。我无法在从 netty 框架调用的 channelRead0
方法中找到读取它的方法。
例如:
context.writeMessage("message");
String msg = context.readMessage;
如果这不可能,我如何将在 channelRead0
方法中获得的结果映射到我在另一个 class 中进行的特定调用?
Netty 框架被设计为异步驱动。使用这个类比,它可以用最少的线程使用量处理大量连接。如果你正在创建一个 api,它使用 netty 框架将调用分派到远程位置,你应该对你的调用使用相同的类比。
与其让你的 api return 直接成为值,不如让 return 成为 Future<?>
or a Promise<?>
。在您的应用程序中有多种实现此系统的方法,最简单的方法是创建一个自定义处理程序,将传入请求映射到 FIFO 队列中的 Promise
s。
这方面的一个例子如下:
这很大程度上基于我过去提交的 答案。
我们从将请求映射到管道中的请求的处理程序开始:
public class MyLastHandler extends SimpleInboundHandler<String> {
private final SynchronousQueue<Promise<String>> queue;
public MyLastHandler (SynchronousQueue<Promise<String>> queue) {
super();
this.queue = queue;
}
// The following is called messageReceived(ChannelHandlerContext, String) in 5.0.
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) {
this.queue.remove().setSuccss(msg);
// Or setFailure(Throwable)
}
}
然后我们需要有一种将命令发送到远程服务器的方法:
Channel channel = ....;
SynchronousQueue<Promise<String>> queue = ....;
public Future<String> sendCommandAsync(String command) {
return sendCommandAsync(command, new DefaultPromise<>());
}
public Future<String> sendCommandAsync(String command, Promise<String> promise) {
synchronized(channel) {
queue.offer(promise);
channel.write(command);
}
channel.flush();
}
完成我们的方法后,我们需要一种方法来调用它:
sendCommandAsync("USER anonymous",
new DefaultPromise<>().addListener(
(Future<String> f) -> {
String response = f.get();
if (response.startWidth("331")) {
// do something
}
// etc
}
)
);
如果被叫方想使用我们的 api 作为阻塞呼叫,他也可以这样做:
String response = sendCommandAsync("USER anonymous").get();
if (response.startWidth("331")) {
// do something
}
// etc
注意Future.get()
can throw an InterruptedException
如果线程状态被中断,不像套接字读取操作,只能通过套接字上的一些交互来取消。此异常在 FutureListener
.
中应该不是问题
我想阅读 class 中特定位置的消息,而不是 InboundHandler
。我无法在从 netty 框架调用的 channelRead0
方法中找到读取它的方法。
例如:
context.writeMessage("message");
String msg = context.readMessage;
如果这不可能,我如何将在 channelRead0
方法中获得的结果映射到我在另一个 class 中进行的特定调用?
Netty 框架被设计为异步驱动。使用这个类比,它可以用最少的线程使用量处理大量连接。如果你正在创建一个 api,它使用 netty 框架将调用分派到远程位置,你应该对你的调用使用相同的类比。
与其让你的 api return 直接成为值,不如让 return 成为 Future<?>
or a Promise<?>
。在您的应用程序中有多种实现此系统的方法,最简单的方法是创建一个自定义处理程序,将传入请求映射到 FIFO 队列中的 Promise
s。
这方面的一个例子如下:
这很大程度上基于我过去提交的
我们从将请求映射到管道中的请求的处理程序开始:
public class MyLastHandler extends SimpleInboundHandler<String> {
private final SynchronousQueue<Promise<String>> queue;
public MyLastHandler (SynchronousQueue<Promise<String>> queue) {
super();
this.queue = queue;
}
// The following is called messageReceived(ChannelHandlerContext, String) in 5.0.
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) {
this.queue.remove().setSuccss(msg);
// Or setFailure(Throwable)
}
}
然后我们需要有一种将命令发送到远程服务器的方法:
Channel channel = ....;
SynchronousQueue<Promise<String>> queue = ....;
public Future<String> sendCommandAsync(String command) {
return sendCommandAsync(command, new DefaultPromise<>());
}
public Future<String> sendCommandAsync(String command, Promise<String> promise) {
synchronized(channel) {
queue.offer(promise);
channel.write(command);
}
channel.flush();
}
完成我们的方法后,我们需要一种方法来调用它:
sendCommandAsync("USER anonymous",
new DefaultPromise<>().addListener(
(Future<String> f) -> {
String response = f.get();
if (response.startWidth("331")) {
// do something
}
// etc
}
)
);
如果被叫方想使用我们的 api 作为阻塞呼叫,他也可以这样做:
String response = sendCommandAsync("USER anonymous").get();
if (response.startWidth("331")) {
// do something
}
// etc
注意Future.get()
can throw an InterruptedException
如果线程状态被中断,不像套接字读取操作,只能通过套接字上的一些交互来取消。此异常在 FutureListener
.