如何将接收到的数据转换为输入流和输出流
How to convert received data to inputstream and outputstream
我必须为 RPC 通信创建 TCP/IP 服务器。我必须使用提供的 java 库来处理所有 "rpc" 的东西。此库收到包含 protobuf 数据的 HDLC 消息。 lib 本身使用请求和响应处理程序来处理 HDLC 和 protobuf 部分。此库可用于串行连接和网络连接。
为此,我们希望使用 netty 作为 TCP 服务器。调用此库时,它需要 "RPC" 方法中的 java.io.inputstream
和 java.io.outputstream
。
我有一个简单的阻塞设置,我在其中创建一个服务器套接字,然后将 socket.getInputStream()
和 socket.getOutputStream()
传递给 RPC 方法。接下来我需要向这个 rpc 对象注册一个(集)rpc 处理程序,然后客户端可以连接并且可以发送数据。对我来说似乎很简单。
我还设置了一个 netty "echo" 服务器,现在我想将这个 RPC 库与 netty 一起使用。我苦恼的是如何将接收到的数据转换为所需的 InputStream
以及如何转换 RPC 库的 OutputStream
以便将其发送回客户端。我需要 decoder/encoder,还是有更简单的方法来做到这一点?如果是这样,我如何将 ByteBuf
转换为 InputStream
并将 OutputStream
转换回可以通过网络发送的格式?
输入流
另一个api有一个readpacket()方法
如果你的库有readPacket方法,你可以使用ByteBufInputStream
in combination with a ReplayingDecoder
,这个比较容易实现:
public class RPCInputHandler extends ReplayingDecoder<Object> {
RPC upstream = ....;
protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
upstream.readPacket(new ByteBufInputStream(buf));
state(null);
}
}
远程api使用线程读取流
如果您的上游库使用单独的线程来处理传入消息,您将失去 Netty 的一项主要优势:大量连接的低线程数。
public class RPCInputHandler extends SimpleChannelInboundHandler<ByteBuf> {
RPC upstream = ....;
PipedInputStream in;
PipedOutputStream out;
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
in = new PipedInputStream();
out = new PipedOutputStream(in);
upstream.startInput(in);
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
out.close(); // This sends EOF to the other pipe
}
// This method is called messageReceived(ChannelHandlerContext, I) in 5.0.
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] data = new byte[msg.readableBytes()];
msg.readBytes(data);
out.write(data);
}
}
输出流
制作一个将字节写入我们的连接的自定义 OutputStream 很简单,大多数方法直接映射到
public class OutputStreamNetty extends OutputStream {
final Channel channel = ...;
ChannelFuture lastFuture;
private void checkFuture() throws IOException {
if(lastFuture.isDone()) {
if(lastFuture.cause() != null) {
throw new IOException("Downstream write problem", lastFuture.cause() );
}
lastFuture = null;
}
}
private void addFuture(ChannelFuture f) {
if(lastFuture == null) {
lastFuture = f;
}
}
public void close() throws IOException {
checkFuture()
addFuture(channel.close());
}
public void flush() throws IOException {
checkFuture()
addFuture(channel.flush());
}
public void write(byte[] b, int off, int len) throws IOException {
checkFuture()
Bytebuf f = channel.alloc().buffer(len);
f.writeBytes(b, off, len);
addFuture(channel.write(f));
}
public abstract void write(int b) throws IOException {
checkFuture()
Bytebuf f = channel.alloc().buffer(1);
f.writeByte(b);
addFuture(channel.write(f));
}
}
我必须为 RPC 通信创建 TCP/IP 服务器。我必须使用提供的 java 库来处理所有 "rpc" 的东西。此库收到包含 protobuf 数据的 HDLC 消息。 lib 本身使用请求和响应处理程序来处理 HDLC 和 protobuf 部分。此库可用于串行连接和网络连接。
为此,我们希望使用 netty 作为 TCP 服务器。调用此库时,它需要 "RPC" 方法中的 java.io.inputstream
和 java.io.outputstream
。
我有一个简单的阻塞设置,我在其中创建一个服务器套接字,然后将 socket.getInputStream()
和 socket.getOutputStream()
传递给 RPC 方法。接下来我需要向这个 rpc 对象注册一个(集)rpc 处理程序,然后客户端可以连接并且可以发送数据。对我来说似乎很简单。
我还设置了一个 netty "echo" 服务器,现在我想将这个 RPC 库与 netty 一起使用。我苦恼的是如何将接收到的数据转换为所需的 InputStream
以及如何转换 RPC 库的 OutputStream
以便将其发送回客户端。我需要 decoder/encoder,还是有更简单的方法来做到这一点?如果是这样,我如何将 ByteBuf
转换为 InputStream
并将 OutputStream
转换回可以通过网络发送的格式?
输入流
另一个api有一个readpacket()方法
如果你的库有readPacket方法,你可以使用ByteBufInputStream
in combination with a ReplayingDecoder
,这个比较容易实现:
public class RPCInputHandler extends ReplayingDecoder<Object> {
RPC upstream = ....;
protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
upstream.readPacket(new ByteBufInputStream(buf));
state(null);
}
}
远程api使用线程读取流
如果您的上游库使用单独的线程来处理传入消息,您将失去 Netty 的一项主要优势:大量连接的低线程数。
public class RPCInputHandler extends SimpleChannelInboundHandler<ByteBuf> {
RPC upstream = ....;
PipedInputStream in;
PipedOutputStream out;
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
in = new PipedInputStream();
out = new PipedOutputStream(in);
upstream.startInput(in);
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
out.close(); // This sends EOF to the other pipe
}
// This method is called messageReceived(ChannelHandlerContext, I) in 5.0.
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] data = new byte[msg.readableBytes()];
msg.readBytes(data);
out.write(data);
}
}
输出流
制作一个将字节写入我们的连接的自定义 OutputStream 很简单,大多数方法直接映射到
public class OutputStreamNetty extends OutputStream {
final Channel channel = ...;
ChannelFuture lastFuture;
private void checkFuture() throws IOException {
if(lastFuture.isDone()) {
if(lastFuture.cause() != null) {
throw new IOException("Downstream write problem", lastFuture.cause() );
}
lastFuture = null;
}
}
private void addFuture(ChannelFuture f) {
if(lastFuture == null) {
lastFuture = f;
}
}
public void close() throws IOException {
checkFuture()
addFuture(channel.close());
}
public void flush() throws IOException {
checkFuture()
addFuture(channel.flush());
}
public void write(byte[] b, int off, int len) throws IOException {
checkFuture()
Bytebuf f = channel.alloc().buffer(len);
f.writeBytes(b, off, len);
addFuture(channel.write(f));
}
public abstract void write(int b) throws IOException {
checkFuture()
Bytebuf f = channel.alloc().buffer(1);
f.writeByte(b);
addFuture(channel.write(f));
}
}