如何将接收到的数据转换为输入流和输出流

How to convert received data to inputstream and outputstream

我必须为 RPC 通信创建 TCP/IP 服务器。我必须使用提供的 java 库来处理所有 "rpc" 的东西。此库收到包含 protobuf 数据的 HDLC 消息。 lib 本身使用请求和响应处理程序来处理 HDLC 和 protobuf 部分。此库可用于串行连接和网络连接。

为此,我们希望使用 netty 作为 TCP 服务器。调用此库时,它需要 "RPC" 方法中的 java.io.inputstreamjava.io.outputstream

我有一个简单的阻塞设置,我在其中创建一个服务器套接字,然后将 socket.getInputStream()socket.getOutputStream() 传递给 RPC 方法。接下来我需要向这个 rpc 对象注册一个(集)rpc 处理程序,然后客户端可以连接并且可以发送数据。对我来说似乎很简单。

我还设置了一个 netty "echo" 服务器,现在我想将这个 RPC 库与 netty 一起使用。我苦恼的是如何将接收到的数据转换为所需的 InputStream 以及如何转换 RPC 库的 OutputStream 以便将其发送回客户端。我需要 decoder/encoder,还是有更简单的方法来做到这一点?如果是这样,我如何将 ByteBuf 转换为 InputStream 并将 OutputStream 转换回可以通过网络发送的格式?

输入流

另一个api有一个readpacket()方法

如果你的库有readPacket方法,你可以使用ByteBufInputStream in combination with a ReplayingDecoder,这个比较容易实现:

public class RPCInputHandler extends ReplayingDecoder<Object> {
   RPC upstream = ....;
   protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
      upstream.readPacket(new ByteBufInputStream(buf));
      state(null);
   }
}

远程api使用线程读取流

如果您的上游库使用单独的线程来处理传入消息,您将失去 Netty 的一项主要优势:大量连接的低线程数。

public class RPCInputHandler extends SimpleChannelInboundHandler<ByteBuf> {
   RPC upstream = ....;
   PipedInputStream in;
   PipedOutputStream out;

   @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
       in = new PipedInputStream();
       out = new PipedOutputStream(in);
       upstream.startInput(in);
   }

   @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       out.close(); // This sends EOF to the other pipe
   }

   // This method is called messageReceived(ChannelHandlerContext, I) in 5.0.
   public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
      byte[] data = new byte[msg.readableBytes()];
      msg.readBytes(data);
      out.write(data);
   }
}

输出流

制作一个将字节写入我们的连接的自定义 OutputStream 很简单,大多数方法直接映射到

public class OutputStreamNetty extends OutputStream {
    final Channel channel = ...;
    ChannelFuture lastFuture;

    private void checkFuture() throws IOException {
        if(lastFuture.isDone()) {
            if(lastFuture.cause() != null) {
                throw new IOException("Downstream write problem", lastFuture.cause() );
            }
            lastFuture = null;
        }
    }

    private void addFuture(ChannelFuture f) {
        if(lastFuture == null) {
            lastFuture = f;
        }
    }

    public void close() throws IOException {
        checkFuture()
        addFuture(channel.close());
    }
    public void flush() throws IOException {
        checkFuture()
        addFuture(channel.flush());
    }
    public void write(byte[] b, int off, int len) throws IOException {
        checkFuture()
        Bytebuf f = channel.alloc().buffer(len);
        f.writeBytes(b, off, len);
        addFuture(channel.write(f));
    }
    public abstract void write(int b) throws IOException {
        checkFuture()
        Bytebuf f = channel.alloc().buffer(1);
        f.writeByte(b);
        addFuture(channel.write(f));
    }

}