Netty:TCP 文件传输无法正常工作
Netty: TCP file transfer doesn't work correctly
我正在处理我的在线文件存储,今天我的 Netty TCP 文件传输遇到了一些问题。所以问题是实际上只有 8192 字节的数据写入客户端的文件中。
我想知道问题出在哪里,我该如何解决。
我已经看过所有其他 (5) 个 Whosebug 问题。
这是我的服务器 bootstrap:
package com.martin.main;
import com.martin.file.*;
import com.martin.handler.*;
import com.martin.utils.*;
import io.netty.bootstrap.*;
import io.netty.channel.*;
import io.netty.channel.nio.*;
import io.netty.channel.socket.*;
import io.netty.channel.socket.nio.*;
import io.netty.handler.codec.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
public class Server {
public static void main(String[] args) {
new Server().setup(4783);
}
public ChannelFuture setup(int port) {
ChannelFuture future = null;
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap bootstrap;
// OnlineFile.getOnlineFiles().add(new OnlineFile(737389, new File("C:\Users\marti\Desktop\filesharing\testek.txt"), "ok.txt", (int) System.currentTimeMillis(), false, "C:\Users\marti\Desktop\filesharing\testek.txt", false, null));
try {
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//pipeline.addLast("framer", new LengthFieldBasedFrameDecoder());
pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast("login", new LoginServerHandler()); //the problem is not in that handler
}
});
future = bootstrap.bind(new InetSocketAddress(port)).sync().channel().closeFuture().sync();
}catch(InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
return future;
}
}
我的服务器处理程序:
public class ServerHandler extends ByteToMessageDecoder {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
byte msgType = byteBuf.readByte();
if(FILEPACKETINFO == msgType) {
//The problem is probaly here
System.out.println("inide file req.");
int id = byteBuf.readInt();
for(OnlineFile onlineFile : OnlineFile.getOnlineFiles()) {
if(onlineFile.getId() == id) {
System.out.println("file request id: " + id + " actual id: " + onlineFile.getId());
ByteBuf buf = Unpooled.buffer();
buf.writeByte(FILEPACKETINFO);
String fileName = onlineFile.getName();
File file = onlineFile.getActualFile();
buf.writeLong(file.length());
buf.writeInt(fileName.length());
buf.writeCharSequence(fileName, CharsetUtil.US_ASCII);
ctx.writeAndFlush(buf);
if(!(file.length() <= 0)) {
ctx.writeAndFlush(new ChunkedFile(file)).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("written successfully!");
if (channelFuture.cause() != null) channelFuture.cause().printStackTrace();
}
});
} else {
System.out.println("length 0: " + " (" + file.length() +")");
}
}
}
} else if(REQUESTVISUALFILES == msgType) {
createAndSend(ctx); //send VISUAL files, problem not there
} else if(REQUESTFOLDERFILESBYID == msgType) {
int id = byteBuf.readInt();
createAndSend(ctx, id); //send VISUAL files by id, problem not there
}
}
我的客户bootstrap:
public static ChannelFuture setup(String host, int port) {
ChannelFuture channelFuture = null;
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(64*1024, 0, 4, 0, 4));
pipeline.addLast(lfp);
pipeline.addLast("login", new LoginHandler()); //the problem is not in that handler either
}
});
channelFuture = bootstrap.connect(new InetSocketAddress(host, port)).sync();
System.out.println("the setup came to an end!");
LoginForm.createAndShowGUI("Login");
mainChannel = channelFuture.channel();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
return channelFuture;
}
}
我的客户处理程序:
package com.martin.handler;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.util.*;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
public class FileChunkHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static String currentFileName = "Test.txt";
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.pipeline().remove(this);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
//the problem is probably here
System.out.println("inside FileChunkHandler current file: " + currentFileName);
ByteBuffer buffer = byteBuf.nioBuffer();
System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
File file = triggerFileCreation();
// FileOutputStream fos = new FileOutputStream("C:\Users\marti\storage\" + currentFileName);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
FileChannel channel = randomAccessFile.getChannel();
while(buffer.hasRemaining()) {
channel.position(file.length());
channel.write(buffer);
System.out.println("chunk has just been written");
}
channel.close();
randomAccessFile.close();
//!!!--->IMPORTANT<-----!!!
ctx.pipeline().remove(this);
}
public static File triggerFileCreation() {
File file = new File(System.getProperty("user.home") + "/storage/" + currentFileName);
if(!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
return file;
}
}
编辑
问题是我删除处理程序的速度太快,然后其他数据包在其他 handlers/were 中处理,从管道中丢弃。
问题确实最有可能出在您的 FileChunkHandler 中。
您只读取一个缓冲区(可能包含 8192 字节 - 8kB),然后删除处理程序。剩余的块要么被管道中的其他处理程序“处理”,要么到达管道的末端并被丢弃。如 Discord 中所述,您需要跟踪您期望的字节数,减去收到的数字,只有当该数字达到 0 时,您才应该删除处理程序。
这里可以做一些代码优化
如果您要做的只是将数据从 ByteBuffer 传输到文件,那么
替换此代码
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
//the problem is probably here
System.out.println("inside FileChunkHandler current file: " + currentFileName);
ByteBuffer buffer = byteBuf.nioBuffer();
System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
File file = triggerFileCreation();
// FileOutputStream fos = new FileOutputStream("C:\Users\marti\storage\" + currentFileName);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
FileChannel channel = randomAccessFile.getChannel();
while(buffer.hasRemaining()) {
channel.position(file.length());
channel.write(buffer);
System.out.println("chunk has just been written");
}
channel.close();
randomAccessFile.close();
//!!!--->IMPORTANT<-----!!!
ctx.pipeline().remove(this);
}
public static File triggerFileCreation() {
File file = new File(System.getProperty("user.home") + "/storage/" + currentFileName);
if(!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
return file;
}
有了这个
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
{
System.out.println("inside FileChunkHandler current file: " + currentFileName);
ByteBuffer buffer = byteBuf.nioBuffer();
System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
try(FileChannel channel=FileChannel.open(Path.of(System.getProperty("user.home"),"/storage/",currentFileName),StandardOpenOption.CREATE,StandardOpenOption.APPEND))
{
while(buffer.hasRemaining())
{
channel.write(buffer);
System.out.println("chunk has just been written");
}
}
//!!!--->IMPORTANT<-----!!!
ctx.pipeline().remove(this);
}
至于为什么你只能从服务器获得 8KB 的字节,我不确定。也许您的服务器重用了相同的 ByteBuffer,如果是这种情况,那么您必须在将其写入文件后清除它以使其 position=0 和 limit=buffer.capacity() 以便服务器可以使用新的 space 向其写入更多字节
而且由于您正在注册一个处理程序来接收新块,因此在您的文件达到预期大小时删除该处理程序可能是个好主意
把这两个放在一起最终的代码如下
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
{
System.out.println("inside FileChunkHandler current file: " + currentFileName);
ByteBuffer buffer = byteBuf.nioBuffer();
System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
try(FileChannel channel=FileChannel.open(Path.of(System.getProperty("user.home"),"/storage/",currentFileName),StandardOpenOption.CREATE,StandardOpenOption.APPEND))
{
while(buffer.hasRemaining())
{
channel.write(buffer);
System.out.println("chunk has just been written");
}
//clear the buffer so the server has space to transfer new chunks into it
buffer.clear();
//Remove handler only after file has reached an certain size
if(channel.size()>=expectedLength){
ctx.pipeline().remove(this);
}
}
}
除了代码优化之外,其他的只是猜测,可能不是您要找的答案。如果有任何进展,请在下方评论
我正在处理我的在线文件存储,今天我的 Netty TCP 文件传输遇到了一些问题。所以问题是实际上只有 8192 字节的数据写入客户端的文件中。 我想知道问题出在哪里,我该如何解决。
我已经看过所有其他 (5) 个 Whosebug 问题。
这是我的服务器 bootstrap:
package com.martin.main;
import com.martin.file.*;
import com.martin.handler.*;
import com.martin.utils.*;
import io.netty.bootstrap.*;
import io.netty.channel.*;
import io.netty.channel.nio.*;
import io.netty.channel.socket.*;
import io.netty.channel.socket.nio.*;
import io.netty.handler.codec.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
public class Server {
public static void main(String[] args) {
new Server().setup(4783);
}
public ChannelFuture setup(int port) {
ChannelFuture future = null;
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap bootstrap;
// OnlineFile.getOnlineFiles().add(new OnlineFile(737389, new File("C:\Users\marti\Desktop\filesharing\testek.txt"), "ok.txt", (int) System.currentTimeMillis(), false, "C:\Users\marti\Desktop\filesharing\testek.txt", false, null));
try {
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//pipeline.addLast("framer", new LengthFieldBasedFrameDecoder());
pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast("login", new LoginServerHandler()); //the problem is not in that handler
}
});
future = bootstrap.bind(new InetSocketAddress(port)).sync().channel().closeFuture().sync();
}catch(InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
return future;
}
}
我的服务器处理程序:
public class ServerHandler extends ByteToMessageDecoder {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
byte msgType = byteBuf.readByte();
if(FILEPACKETINFO == msgType) {
//The problem is probaly here
System.out.println("inide file req.");
int id = byteBuf.readInt();
for(OnlineFile onlineFile : OnlineFile.getOnlineFiles()) {
if(onlineFile.getId() == id) {
System.out.println("file request id: " + id + " actual id: " + onlineFile.getId());
ByteBuf buf = Unpooled.buffer();
buf.writeByte(FILEPACKETINFO);
String fileName = onlineFile.getName();
File file = onlineFile.getActualFile();
buf.writeLong(file.length());
buf.writeInt(fileName.length());
buf.writeCharSequence(fileName, CharsetUtil.US_ASCII);
ctx.writeAndFlush(buf);
if(!(file.length() <= 0)) {
ctx.writeAndFlush(new ChunkedFile(file)).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("written successfully!");
if (channelFuture.cause() != null) channelFuture.cause().printStackTrace();
}
});
} else {
System.out.println("length 0: " + " (" + file.length() +")");
}
}
}
} else if(REQUESTVISUALFILES == msgType) {
createAndSend(ctx); //send VISUAL files, problem not there
} else if(REQUESTFOLDERFILESBYID == msgType) {
int id = byteBuf.readInt();
createAndSend(ctx, id); //send VISUAL files by id, problem not there
}
}
我的客户bootstrap:
public static ChannelFuture setup(String host, int port) {
ChannelFuture channelFuture = null;
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(64*1024, 0, 4, 0, 4));
pipeline.addLast(lfp);
pipeline.addLast("login", new LoginHandler()); //the problem is not in that handler either
}
});
channelFuture = bootstrap.connect(new InetSocketAddress(host, port)).sync();
System.out.println("the setup came to an end!");
LoginForm.createAndShowGUI("Login");
mainChannel = channelFuture.channel();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
return channelFuture;
}
}
我的客户处理程序:
package com.martin.handler;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.util.*;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
public class FileChunkHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static String currentFileName = "Test.txt";
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.pipeline().remove(this);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
//the problem is probably here
System.out.println("inside FileChunkHandler current file: " + currentFileName);
ByteBuffer buffer = byteBuf.nioBuffer();
System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
File file = triggerFileCreation();
// FileOutputStream fos = new FileOutputStream("C:\Users\marti\storage\" + currentFileName);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
FileChannel channel = randomAccessFile.getChannel();
while(buffer.hasRemaining()) {
channel.position(file.length());
channel.write(buffer);
System.out.println("chunk has just been written");
}
channel.close();
randomAccessFile.close();
//!!!--->IMPORTANT<-----!!!
ctx.pipeline().remove(this);
}
public static File triggerFileCreation() {
File file = new File(System.getProperty("user.home") + "/storage/" + currentFileName);
if(!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
return file;
}
}
编辑
问题是我删除处理程序的速度太快,然后其他数据包在其他 handlers/were 中处理,从管道中丢弃。
问题确实最有可能出在您的 FileChunkHandler 中。
您只读取一个缓冲区(可能包含 8192 字节 - 8kB),然后删除处理程序。剩余的块要么被管道中的其他处理程序“处理”,要么到达管道的末端并被丢弃。如 Discord 中所述,您需要跟踪您期望的字节数,减去收到的数字,只有当该数字达到 0 时,您才应该删除处理程序。
这里可以做一些代码优化
如果您要做的只是将数据从 ByteBuffer 传输到文件,那么 替换此代码
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
//the problem is probably here
System.out.println("inside FileChunkHandler current file: " + currentFileName);
ByteBuffer buffer = byteBuf.nioBuffer();
System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
File file = triggerFileCreation();
// FileOutputStream fos = new FileOutputStream("C:\Users\marti\storage\" + currentFileName);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
FileChannel channel = randomAccessFile.getChannel();
while(buffer.hasRemaining()) {
channel.position(file.length());
channel.write(buffer);
System.out.println("chunk has just been written");
}
channel.close();
randomAccessFile.close();
//!!!--->IMPORTANT<-----!!!
ctx.pipeline().remove(this);
}
public static File triggerFileCreation() {
File file = new File(System.getProperty("user.home") + "/storage/" + currentFileName);
if(!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
return file;
}
有了这个
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
{
System.out.println("inside FileChunkHandler current file: " + currentFileName);
ByteBuffer buffer = byteBuf.nioBuffer();
System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
try(FileChannel channel=FileChannel.open(Path.of(System.getProperty("user.home"),"/storage/",currentFileName),StandardOpenOption.CREATE,StandardOpenOption.APPEND))
{
while(buffer.hasRemaining())
{
channel.write(buffer);
System.out.println("chunk has just been written");
}
}
//!!!--->IMPORTANT<-----!!!
ctx.pipeline().remove(this);
}
至于为什么你只能从服务器获得 8KB 的字节,我不确定。也许您的服务器重用了相同的 ByteBuffer,如果是这种情况,那么您必须在将其写入文件后清除它以使其 position=0 和 limit=buffer.capacity() 以便服务器可以使用新的 space 向其写入更多字节
而且由于您正在注册一个处理程序来接收新块,因此在您的文件达到预期大小时删除该处理程序可能是个好主意
把这两个放在一起最终的代码如下
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
{
System.out.println("inside FileChunkHandler current file: " + currentFileName);
ByteBuffer buffer = byteBuf.nioBuffer();
System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
try(FileChannel channel=FileChannel.open(Path.of(System.getProperty("user.home"),"/storage/",currentFileName),StandardOpenOption.CREATE,StandardOpenOption.APPEND))
{
while(buffer.hasRemaining())
{
channel.write(buffer);
System.out.println("chunk has just been written");
}
//clear the buffer so the server has space to transfer new chunks into it
buffer.clear();
//Remove handler only after file has reached an certain size
if(channel.size()>=expectedLength){
ctx.pipeline().remove(this);
}
}
}
除了代码优化之外,其他的只是猜测,可能不是您要找的答案。如果有任何进展,请在下方评论