具有异步调用方的 Netty 同步客户端

Netty synchronous client with asynchronous callers

我正在创建一个服务器,它使用来自 JMS、SNMP、HTTP 等众多来源的命令。这些都是异步的并且工作正常。服务器保持与单个遗留硬件的单一连接,该硬件具有带有自定义 TCP 协议的 request/reply 架构。 理想情况下,我想要一个像这种阻塞类型方法的命令

public Response issueCommandToLegacyHardware(Command command)

或者这个异步类型的方法

public Future<Response> issueCommandToLegacyHardware(Command command)

我对Netty和异步编程比较陌生,基本上是边学边学。我目前的想法是,我的 LegacyHardwareClient class 将有 public synchronized issueCommandToLegacyHardware(Command command),将写入到遗留硬件的客户端通道,然后 take() 来自 SynchronousQueue<Response>这将阻止。管道中的 ChannelInboundHandler 将 offer() 一个 ResponseSynchronousQueue>Response>,这将允许 take() 解锁并接收数据。

这也太绕了吧?我可以查看有关同步 Netty 客户端实现的任何示例吗? Netty 有什么最佳实践吗? 我显然可以只使用标准 Java 套接字,但是 Netty 解析自定义协议的强大功能以及易维护性实在是太棒了,不能放弃。

更新: 就实现而言,我使用了 ArrayBlockingQueue<>() 并且我使用了 put() 和 remove() 而不是 offer() 和 remove()。因为我想确保仅在回复任何活动请求时才发送对遗留硬件的后续请求,否则无法确定遗留硬件行为。

offer() 和 remove() 对我不起作用的原因是如果没有主动阻止另一方的 take() 请求,offer() 命令将不会传递任何内容。相反,remove() 不会 return 任何东西,除非有阻塞的 put() 调用插入数据。 我无法使用 put()/remove(),因为永远不会到达 remove() 语句,因为没有请求写入通道以触发调用 remove() 的事件。我无法使用 offer()/take(),因为 offer() 语句会 return false,因为 take() 调用尚未执行。 使用容量为1的ArrayBlockingQueue<>(),保证一次只能执行一个命令。任何其他命令都会阻塞,直到有足够的空间插入,容量为 1 这意味着它必须是空的。一旦从遗留硬件收到响应,就会清空队列。这确保了对遗留硬件的良好同步行为,但为遗留硬件的用户提供了异步 API,其中有很多。

与其使用 SynchronousQueue<Response> 以阻塞方式设计应用程序,不如使用 SynchronousQueue<Promise<Response>>.

以非阻塞方式设计应用程序

你的 public Future<Response> issueCommandToLegacyHardware(Command command) 然后应该使用 offer() 添加一个 DefaultPromise<>() 到队列,然后 netty 管道可以使用 remove() 来获取该请求的响应,请注意我使用了 remove() instead of take(),因为只有在特殊情况下,才会出现 none 元素。

对此的快速实现可能是:

public class MyLastHandler extends SimpleInboundHandler<Response> {
    private final SynchronousQueue<Promise<Response>> queue;

    public MyLastHandler (SynchronousQueue<Promise<Response>> queue) {
        super();
        this.queue = queue;
    }

    // The following is called messageReceived(ChannelHandlerContext, Response) in 5.0.
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Response msg) {
        this.queue.remove().setSuccss(msg); // Or setFailure(Throwable)
    }
}

上面的处理程序应该放在链的最后。

public Future<Response> issueCommandToLegacyHardware(Command command)的实现可以看一下:

Channel channel = ....;
SynchronousQueue<Promise<Response>> queue = ....;

public Future<Response> issueCommandToLegacyHardware(Command command) {
    return issueCommandToLegacyHardware(command, channel.eventLoop().newPromise());
}

public Future<Response> issueCommandToLegacyHardware(Command command, Promise<Response> promise) {
    queue.offer(promise);
    channel.write(command);
    return promise;
}

issueCommandToLegacyHardware 上使用重载的方法也是 Channel.write 使用的设计模式,这使得它非常灵活。

此设计模式可在客户端代码中按如下方式使用:

issueCommandToLegacyHardware(
    Command.TAKE_OVER_THE_WORLD_WITH_FIRE, 
    channel.eventLoop().newPromise()
).addListener(
    (Future<Response> f) -> {
        System.out.println("We have taken over the world: " + f.get());
    }
);

这种设计模式的优点是没有在任何地方使用不必要的阻塞,只是简单的异步逻辑。

附录一:Javadoc:

Promise Future DefaultPromise