Netty ByteToMessageDecoder 运行的次数超过了它必须运行的次数
Netty ByteToMessageDecoder runs more times than it has to
我正在尝试使用 netty 编写一个简单的客户端服务器应用程序。
我遵循了 this 教程,特别是时间服务器模型和 POJO 模型。我的问题是 ByteToMessageDecoder :它运行的次数比它必须运行的次数多,这意味着它不会在读取空 ByteBuf 时停止,而是会再读取一次,并且出于某种我无法理解的原因,它会找到我的客户拥有的上一条消息发送!我确定客户端只发送一次该消息!
所以想法是这样的:一个简单的客户端服务器模型,其中客户端发送 "DataPacket" 并在其中包含 "hello world" 消息,服务器以 [=36=] 响应 "ACK".我正在使用 DataPacket 类型,因为将来我想在其中传递更多的东西,添加一个 header 并构建一些更复杂的东西......但是对于初学者来说,我需要看看我在这方面做错了什么一个...
错误:
可以看到Server正常启动,我的Client(Transceiver)发送报文,encoder激活并从DataPacket转换为ByteBuf,从server收发报文,Server的Decoder激活并将其从 ByteBuf 转换为 DataPacket,然后服务器相应地处理它...它应该发送 ACK 并向后重复相同的操作,但这是出了问题,我不明白为什么。
我在这里阅读了一些帖子并且已经尝试过 LengthFieldBasedFrameDecoder,它没有用,我也想看看这个有什么问题,如果可能的话不要使用其他东西...
代码:
编码器和解码器class:
package org.client_server;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
public class EncoderDecoder {
public static class NettyEncoder extends MessageToByteEncoder<DataPacket> {
@Override
protected void encode(ChannelHandlerContext ctx, DataPacket msg, ByteBuf out)
throws Exception {
System.out.println("Encode: "+msg.getData());
out.writeBytes(msg.convertData());
}
}
public static class NettyDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
if((in.readableBytes() < 4) ) {
return;
}
String msg = in.toString(CharsetUtil.UTF_8);
System.out.println("Decode:"+msg);
out.add(new DataPacket(msg));
}
}
}
服务器处理程序:
class DataAvroHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
DataPacket in = (DataPacket)msg;
System.out.println("[Server]: Message received..."+in.getData());
}finally {
ReferenceCountUtil.release(msg);
//ctx.close();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println("[Server]: Read Complete...");
DataPacket pkt = new DataPacket("ACK!");
//pkt.setData(Unpooled.copiedBuffer("ACK", CharsetUtil.UTF_8));
ctx.writeAndFlush(pkt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
serverLog.warning("[Server]: Error..." + cause.toString());
ctx.close();
}
客户端处理程序:
class DataAvroHandlerCl extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("[Transceiver]: Channel Active!!!");
DataPacket pkt = new DataPacket("Hello World!");
ChannelFuture f = ctx.writeAndFlush(pkt);
//f.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
DataPacket in = (DataPacket)msg;
System.out.println("[Transceiver]: Message received..."+in.getData());
}finally {
ReferenceCountUtil.release(msg);
//ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
transLog.warning("[Transceiver] : Error..." + cause.getMessage());
ctx.close();
}
}
服务器和客户端管道:
ch.pipeline().addLast("Decoder", new EncoderDecoder.NettyDecoder());
ch.pipeline().addLast("Encoder", new EncoderDecoder.NettyEncoder());
ch.pipeline().addLast("DataAvroHandler", new DataAvroHandler());
您的问题是由于在 NettyDecoder
.
中使用 ByteBuf in
的 toString()
方法引起的
引用自 javadoc (http://netty.io/4.0/api/io/netty/buffer/ByteBuf.html#toString%28java.nio.charset.Charset%29):
This method does not modify readerIndex or writerIndex of this buffer.
现在,ByteToMessageDecoder
不知道你实际解码了多少字节!看起来你解码了 0 个字节,因为缓冲区的 readerIndex 没有被修改,因此你也会在控制台中收到错误消息。
您必须手动修改 readerIndex:
String msg = in.toString(CharsetUtil.UTF_8);
in.readerIndex(in.readerIndex() + in.readableBytes());
System.out.println("Decode:"+msg);
我正在尝试使用 netty 编写一个简单的客户端服务器应用程序。 我遵循了 this 教程,特别是时间服务器模型和 POJO 模型。我的问题是 ByteToMessageDecoder :它运行的次数比它必须运行的次数多,这意味着它不会在读取空 ByteBuf 时停止,而是会再读取一次,并且出于某种我无法理解的原因,它会找到我的客户拥有的上一条消息发送!我确定客户端只发送一次该消息!
所以想法是这样的:一个简单的客户端服务器模型,其中客户端发送 "DataPacket" 并在其中包含 "hello world" 消息,服务器以 [=36=] 响应 "ACK".我正在使用 DataPacket 类型,因为将来我想在其中传递更多的东西,添加一个 header 并构建一些更复杂的东西......但是对于初学者来说,我需要看看我在这方面做错了什么一个...
错误:
可以看到Server正常启动,我的Client(Transceiver)发送报文,encoder激活并从DataPacket转换为ByteBuf,从server收发报文,Server的Decoder激活并将其从 ByteBuf 转换为 DataPacket,然后服务器相应地处理它...它应该发送 ACK 并向后重复相同的操作,但这是出了问题,我不明白为什么。
我在这里阅读了一些帖子并且已经尝试过 LengthFieldBasedFrameDecoder,它没有用,我也想看看这个有什么问题,如果可能的话不要使用其他东西...
代码:
编码器和解码器class:
package org.client_server;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
public class EncoderDecoder {
public static class NettyEncoder extends MessageToByteEncoder<DataPacket> {
@Override
protected void encode(ChannelHandlerContext ctx, DataPacket msg, ByteBuf out)
throws Exception {
System.out.println("Encode: "+msg.getData());
out.writeBytes(msg.convertData());
}
}
public static class NettyDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
if((in.readableBytes() < 4) ) {
return;
}
String msg = in.toString(CharsetUtil.UTF_8);
System.out.println("Decode:"+msg);
out.add(new DataPacket(msg));
}
}
}
服务器处理程序:
class DataAvroHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
DataPacket in = (DataPacket)msg;
System.out.println("[Server]: Message received..."+in.getData());
}finally {
ReferenceCountUtil.release(msg);
//ctx.close();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println("[Server]: Read Complete...");
DataPacket pkt = new DataPacket("ACK!");
//pkt.setData(Unpooled.copiedBuffer("ACK", CharsetUtil.UTF_8));
ctx.writeAndFlush(pkt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
serverLog.warning("[Server]: Error..." + cause.toString());
ctx.close();
}
客户端处理程序:
class DataAvroHandlerCl extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("[Transceiver]: Channel Active!!!");
DataPacket pkt = new DataPacket("Hello World!");
ChannelFuture f = ctx.writeAndFlush(pkt);
//f.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
DataPacket in = (DataPacket)msg;
System.out.println("[Transceiver]: Message received..."+in.getData());
}finally {
ReferenceCountUtil.release(msg);
//ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
transLog.warning("[Transceiver] : Error..." + cause.getMessage());
ctx.close();
}
}
服务器和客户端管道:
ch.pipeline().addLast("Decoder", new EncoderDecoder.NettyDecoder());
ch.pipeline().addLast("Encoder", new EncoderDecoder.NettyEncoder());
ch.pipeline().addLast("DataAvroHandler", new DataAvroHandler());
您的问题是由于在 NettyDecoder
.
ByteBuf in
的 toString()
方法引起的
引用自 javadoc (http://netty.io/4.0/api/io/netty/buffer/ByteBuf.html#toString%28java.nio.charset.Charset%29):
This method does not modify readerIndex or writerIndex of this buffer.
现在,ByteToMessageDecoder
不知道你实际解码了多少字节!看起来你解码了 0 个字节,因为缓冲区的 readerIndex 没有被修改,因此你也会在控制台中收到错误消息。
您必须手动修改 readerIndex:
String msg = in.toString(CharsetUtil.UTF_8);
in.readerIndex(in.readerIndex() + in.readableBytes());
System.out.println("Decode:"+msg);