Netty文件传输导致异常
Netty file transfer causes exception
我正在尝试通过我的 netty 实现传输文件。我的自定义解码器和编码器都知道两种类型的对象:String
和 FileChunk,它实际上包含块的索引及其内容作为 byte[]
。转移工作如下:
- Client0 向 Client1 发送一个 JsonObject,作为
String
包含文件应保存的路径、文件的大小、将发送的切片数等。
- Client0 将第一个切片作为 FileChunk 发送,并阻塞线程以等待 Client1 成功收到其数据包的答复。
- Client1 收到文件的切片并将其写入磁盘。然后发送成功包给Client0
- Client0收到成功包,发送下一个分片。
在传输文件之前,此进度应继续进行。它有效。 如果我在发送 64kb 的文件片段之前添加 1 秒的延迟! Derp.
似乎没有错误 - 但它在重负载和不阻塞线程的情况下不起作用。我需要在任何地方清除缓冲区还是需要一份副本?请帮助...如果您对使用 IntelliJ 和 Maven 的示例项目感兴趣,请在评论中告诉我,我会准备好。
解释得够多了。这是代码!
FileTransfer 可运行
public class FileTransfer implements Runnable {
private FileSlicer slicer;
private Client client;
public FileTransfer(FileSlicer slicer, Client client) {
this.slicer = slicer;
this.client = client;
}
public void run() {
synchronized(this) {
while(slicer.hasNext()) {
try {
client.getContext().writeAndFlush(slicer.getNextSlice());
this.wait(); //Unblocked when success packet received, works
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
Channel-Initializer(此处设置缓冲区大小,默认不溢出)
@Override
protected void initChannel(Channel channel) throws Exception {
channel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(1024 * 65));
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new PacketDecoder());
pipeline.addLast(new PacketEncoder());
pipeline.addLast(new ChannelEncoder());
pipeline.addLast(new ServerHandler());
}
Decoder(检测是String还是FileChunk并解析):
public class PacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> output) throws Exception {
int type = buf.readInt();
if (buf.readableBytes() <= 0) return;
byte[] buffer;
switch (type) {
case 0:
buffer = buf.readBytes(buf.readInt()).array();
output.add(new String(buffer));
break;
case 1:
int read = buf.readInt();
buffer = buf.readBytes(buf.readInt()).array();
output.add(new FileChunk(buffer, read));
break;
default:
System.out.println("Unknown Decodec.");
break;
}
}
}
堆栈跟踪:
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(12) + length(65536) exceeds writerIndex(40960): PooledUnsafeDirectByteBuf(ridx: 12, widx: 40960, cap: 66560)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:347)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:331)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
...
解码函数假定整个文件可用,但是缓冲区可能只包含部分数据,具体取决于接收到的流的数量。
解决此问题的一种方法是添加帧解码器,例如 LengthFieldBasedFrameDecoder,它根据消息中的长度字段对流进行分段,例如在您的示例中。
另一种选择是使用与文档中的文件服务器示例相同的方法:https://netty.io/4.1/xref/io/netty/example/http/file/package-summary.html
我正在尝试通过我的 netty 实现传输文件。我的自定义解码器和编码器都知道两种类型的对象:String
和 FileChunk,它实际上包含块的索引及其内容作为 byte[]
。转移工作如下:
- Client0 向 Client1 发送一个 JsonObject,作为
String
包含文件应保存的路径、文件的大小、将发送的切片数等。 - Client0 将第一个切片作为 FileChunk 发送,并阻塞线程以等待 Client1 成功收到其数据包的答复。
- Client1 收到文件的切片并将其写入磁盘。然后发送成功包给Client0
- Client0收到成功包,发送下一个分片。
在传输文件之前,此进度应继续进行。它有效。 如果我在发送 64kb 的文件片段之前添加 1 秒的延迟! Derp.
似乎没有错误 - 但它在重负载和不阻塞线程的情况下不起作用。我需要在任何地方清除缓冲区还是需要一份副本?请帮助...如果您对使用 IntelliJ 和 Maven 的示例项目感兴趣,请在评论中告诉我,我会准备好。
解释得够多了。这是代码!
FileTransfer 可运行
public class FileTransfer implements Runnable {
private FileSlicer slicer;
private Client client;
public FileTransfer(FileSlicer slicer, Client client) {
this.slicer = slicer;
this.client = client;
}
public void run() {
synchronized(this) {
while(slicer.hasNext()) {
try {
client.getContext().writeAndFlush(slicer.getNextSlice());
this.wait(); //Unblocked when success packet received, works
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
Channel-Initializer(此处设置缓冲区大小,默认不溢出)
@Override
protected void initChannel(Channel channel) throws Exception {
channel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(1024 * 65));
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new PacketDecoder());
pipeline.addLast(new PacketEncoder());
pipeline.addLast(new ChannelEncoder());
pipeline.addLast(new ServerHandler());
}
Decoder(检测是String还是FileChunk并解析):
public class PacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> output) throws Exception {
int type = buf.readInt();
if (buf.readableBytes() <= 0) return;
byte[] buffer;
switch (type) {
case 0:
buffer = buf.readBytes(buf.readInt()).array();
output.add(new String(buffer));
break;
case 1:
int read = buf.readInt();
buffer = buf.readBytes(buf.readInt()).array();
output.add(new FileChunk(buffer, read));
break;
default:
System.out.println("Unknown Decodec.");
break;
}
}
}
堆栈跟踪:
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(12) + length(65536) exceeds writerIndex(40960): PooledUnsafeDirectByteBuf(ridx: 12, widx: 40960, cap: 66560)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:347)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:331)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
...
解码函数假定整个文件可用,但是缓冲区可能只包含部分数据,具体取决于接收到的流的数量。
解决此问题的一种方法是添加帧解码器,例如 LengthFieldBasedFrameDecoder,它根据消息中的长度字段对流进行分段,例如在您的示例中。
另一种选择是使用与文档中的文件服务器示例相同的方法:https://netty.io/4.1/xref/io/netty/example/http/file/package-summary.html