Netty、ProtoBuf、WebSocket;如何将 BinaryWebSocketFrame 转换为 protobuf 类型?
Netty, ProtoBuf, WebSocket; how to convert BinaryWebSocketFrame to protobuf type?
我有一个使用 protobuf 的服务器。处理程序定义为 -
public class ServerHandler extends SimpleChannelInboundHandler<MyProtoBufType> {
流水线如下-
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyProtoBufType.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast("ServerHandler", new ServerHandler());
有了它和 Java 客户端,一切正常。
现在,为了能够使用 Javascript websocket 进行连接,我修改了服务器管道如下 -
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
objs.add(buf);
buf.retain();
}
});
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyProtoBufType.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast("ServerHandler", new ServerHandler());
以下是基于protobufjs库的Java脚本客户端代码-
var ProtoBuf = dcodeIO.ProtoBuf,
builder = ProtoBuf.loadProtoFile("./example.proto"),
pkg = builder.build("com.example");
socket = new WebSocket("ws://localhost:8484");
socket.binaryType = "arraybuffer";
socket.onopen = function() {
var login = new pkg.Login({"userName": "hello", "password": "pass"});
var wrapper = new pkg.MyProtoBufType();
wrapper.set("login", login);
socket.send(wrapper.toArrayBuffer());
};
当上述代码为运行时,服务器在protobuf解码器处失败。以下是堆栈跟踪 -
WARN [nioEventLoopGroup-3-6] (DefaultChannelPipeline.java:1144) - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:288)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:275)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:391)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:243)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.handler.codec.http.websocketx.Utf8FrameValidator.channelRead(Utf8FrameValidator.java:77)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.channelRead(WebSocketServerProtocolHandler.java:159)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:288)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1336)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:544)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:742)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
at java.lang.Thread.run(Unknown Source)
Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:70)
at com.google.protobuf.CodedInputStream.readRawBytesSlowPath(CodedInputStream.java:1187)
at com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:517)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:501)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:478)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:596)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:281)
at com.google.protobuf.CodedInputStream.readGroup(CodedInputStream.java:424)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:505)
at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:205)
at com.exmaple.MyProtoBufType.<init>(CallBreakProtocol.java:15290)
at com.exmaple.MyProtoBufType.<init>(CallBreakProtocol.java:15273)
at com.exmaple.MyProtoBufType.parsePartialFrom(CallBreakProtocol.java:15638)
at com.exmaple.MyProtoBufType.parsePartialFrom(CallBreakProtocol.java:1)
at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:137)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:168)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:174)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121)
at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:64)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
... 52 more
有没有人一起用过Netty、ProtoBuf和WebSocket?如何实现?
我只需要删除以下 -
pipeline.addLast(new ProtobufVarint32FrameDecoder());
现在一切正常。
我认为 ProtobufVarint32FrameDecoder 是 netty 本身的优化,这意味着客户端也需要根据此 https://issues.redhat.com/browse/NETTY-203?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
使用 ProtobufVarint32LengthFieldPrepender
我有一个使用 protobuf 的服务器。处理程序定义为 -
public class ServerHandler extends SimpleChannelInboundHandler<MyProtoBufType> {
流水线如下-
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyProtoBufType.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast("ServerHandler", new ServerHandler());
有了它和 Java 客户端,一切正常。
现在,为了能够使用 Javascript websocket 进行连接,我修改了服务器管道如下 -
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
objs.add(buf);
buf.retain();
}
});
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyProtoBufType.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast("ServerHandler", new ServerHandler());
以下是基于protobufjs库的Java脚本客户端代码-
var ProtoBuf = dcodeIO.ProtoBuf,
builder = ProtoBuf.loadProtoFile("./example.proto"),
pkg = builder.build("com.example");
socket = new WebSocket("ws://localhost:8484");
socket.binaryType = "arraybuffer";
socket.onopen = function() {
var login = new pkg.Login({"userName": "hello", "password": "pass"});
var wrapper = new pkg.MyProtoBufType();
wrapper.set("login", login);
socket.send(wrapper.toArrayBuffer());
};
当上述代码为运行时,服务器在protobuf解码器处失败。以下是堆栈跟踪 -
WARN [nioEventLoopGroup-3-6] (DefaultChannelPipeline.java:1144) - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:288)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:275)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:391)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:243)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.handler.codec.http.websocketx.Utf8FrameValidator.channelRead(Utf8FrameValidator.java:77)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.channelRead(WebSocketServerProtocolHandler.java:159)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:288)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:327)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1336)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:335)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:544)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:742)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
at java.lang.Thread.run(Unknown Source)
Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:70)
at com.google.protobuf.CodedInputStream.readRawBytesSlowPath(CodedInputStream.java:1187)
at com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:517)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:501)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:478)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:596)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:281)
at com.google.protobuf.CodedInputStream.readGroup(CodedInputStream.java:424)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:505)
at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:205)
at com.exmaple.MyProtoBufType.<init>(CallBreakProtocol.java:15290)
at com.exmaple.MyProtoBufType.<init>(CallBreakProtocol.java:15273)
at com.exmaple.MyProtoBufType.parsePartialFrom(CallBreakProtocol.java:15638)
at com.exmaple.MyProtoBufType.parsePartialFrom(CallBreakProtocol.java:1)
at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:137)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:168)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:174)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121)
at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:64)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
... 52 more
有没有人一起用过Netty、ProtoBuf和WebSocket?如何实现?
我只需要删除以下 -
pipeline.addLast(new ProtobufVarint32FrameDecoder());
现在一切正常。
我认为 ProtobufVarint32FrameDecoder 是 netty 本身的优化,这意味着客户端也需要根据此 https://issues.redhat.com/browse/NETTY-203?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
使用 ProtobufVarint32LengthFieldPrepender