Netty:TCP 文件传输无法正常工作

Netty: TCP file transfer doesn't work correctly

我正在处理我的在线文件存储,今天我的 Netty TCP 文件传输遇到了一些问题。所以问题是实际上只有 8192 字节的数据写入客户端的文件中。 我想知道问题出在哪里,我该如何解决。

我已经看过所有其他 (5) 个 Whosebug 问题。

这是我的服务器 bootstrap:

package com.martin.main;

import com.martin.file.*;
import com.martin.handler.*;
import com.martin.utils.*;
import io.netty.bootstrap.*;
import io.netty.channel.*;
import io.netty.channel.nio.*;
import io.netty.channel.socket.*;
import io.netty.channel.socket.nio.*;
import io.netty.handler.codec.*;

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;

public class Server {

    public static void main(String[] args) {
        new Server().setup(4783);
    }

    public ChannelFuture setup(int port) {
        ChannelFuture future = null;
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final ServerBootstrap bootstrap;

       // OnlineFile.getOnlineFiles().add(new OnlineFile(737389, new File("C:\Users\marti\Desktop\filesharing\testek.txt"), "ok.txt", (int) System.currentTimeMillis(), false, "C:\Users\marti\Desktop\filesharing\testek.txt", false, null));

     
        try {
            bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //pipeline.addLast("framer", new LengthFieldBasedFrameDecoder());
                    pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                    pipeline.addLast(new LengthFieldPrepender(4));
                    pipeline.addLast("login", new LoginServerHandler()); //the problem is not in that handler
                }
            });
            future = bootstrap.bind(new InetSocketAddress(port)).sync().channel().closeFuture().sync();
        }catch(InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
        return future;
    }

}

我的服务器处理程序:

public class ServerHandler extends ByteToMessageDecoder {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {

        byte msgType = byteBuf.readByte();

        if(FILEPACKETINFO == msgType) {
            //The problem is probaly here
            System.out.println("inide file req.");
            int id = byteBuf.readInt();

            for(OnlineFile onlineFile : OnlineFile.getOnlineFiles()) {
                if(onlineFile.getId() == id) {
                    System.out.println("file request id: " + id + " actual id: " + onlineFile.getId());
                    ByteBuf buf = Unpooled.buffer();
                    buf.writeByte(FILEPACKETINFO);
                    String fileName = onlineFile.getName();
                    File file = onlineFile.getActualFile();
                    buf.writeLong(file.length());
                    buf.writeInt(fileName.length());
                    buf.writeCharSequence(fileName, CharsetUtil.US_ASCII);
                    ctx.writeAndFlush(buf);

                    if(!(file.length() <= 0)) {

                        ctx.writeAndFlush(new ChunkedFile(file)).addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                System.out.println("written successfully!");
                                if (channelFuture.cause() != null) channelFuture.cause().printStackTrace();
                            }
                        });
                    } else {
                        System.out.println("length 0: " + " (" + file.length() +")");
                    }
                }
            }
        } else if(REQUESTVISUALFILES == msgType) {
            createAndSend(ctx); //send VISUAL files, problem not there
        } else if(REQUESTFOLDERFILESBYID == msgType) {
            int id = byteBuf.readInt();
            createAndSend(ctx, id); //send VISUAL files by id, problem not there
        }
    }

我的客户bootstrap:

    public static ChannelFuture setup(String host, int port) {
        ChannelFuture channelFuture = null;

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(64*1024, 0, 4, 0, 4));
                    pipeline.addLast(lfp);
                    pipeline.addLast("login", new LoginHandler()); //the problem is not in that handler either
                }
            });
            channelFuture = bootstrap.connect(new InetSocketAddress(host, port)).sync();
            System.out.println("the setup came to an end!");
            LoginForm.createAndShowGUI("Login");
            mainChannel = channelFuture.channel();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
        return channelFuture;
    }

}

我的客户处理程序:

package com.martin.handler;

import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.util.*;

import java.io.*;
import java.nio.*;
import java.nio.channels.*;

public class FileChunkHandler extends SimpleChannelInboundHandler<ByteBuf> {

    public static String currentFileName = "Test.txt";

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.pipeline().remove(this);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
        //the problem is probably here
        System.out.println("inside FileChunkHandler current file: " + currentFileName);
        ByteBuffer buffer = byteBuf.nioBuffer();
        System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());

        File file = triggerFileCreation();

       // FileOutputStream fos = new FileOutputStream("C:\Users\marti\storage\" + currentFileName);
        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
        FileChannel channel = randomAccessFile.getChannel();

        while(buffer.hasRemaining()) {
            channel.position(file.length());
            channel.write(buffer);
            System.out.println("chunk has just been written");
        }
        channel.close();
        randomAccessFile.close();

        //!!!--->IMPORTANT<-----!!!
        ctx.pipeline().remove(this);
    }
    public static File triggerFileCreation() {
        File file = new File(System.getProperty("user.home") + "/storage/" + currentFileName);
        if(!file.exists()) {
            try {
                file.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return file;
    }
}

编辑

问题是我删除处理程序的速度太快,然后其他数据包在其他 handlers/were 中处理,从管道中丢弃。

问题确实最有可能出在您的 FileChunkHandler 中。

您只读取一个缓冲区(可能包含 8192 字节 - 8kB),然后删除处理程序。剩余的块要么被管道中的其他处理程序“处理”,要么到达管道的末端并被丢弃。如 Discord 中所述,您需要跟踪您期望的字节数,减去收到的数字,只有当该数字达到 0 时,您才应该删除处理程序。

这里可以做一些代码优化

如果您要做的只是将数据从 ByteBuffer 传输到文件,那么 替换此代码

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
        //the problem is probably here
        System.out.println("inside FileChunkHandler current file: " + currentFileName);
        ByteBuffer buffer = byteBuf.nioBuffer();
        System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());

        File file = triggerFileCreation();

       // FileOutputStream fos = new FileOutputStream("C:\Users\marti\storage\" + currentFileName);
        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
        FileChannel channel = randomAccessFile.getChannel();

        while(buffer.hasRemaining()) {
            channel.position(file.length());
            channel.write(buffer);
            System.out.println("chunk has just been written");
        }
        channel.close();
        randomAccessFile.close();

        //!!!--->IMPORTANT<-----!!!
        ctx.pipeline().remove(this);
    }

    public static File triggerFileCreation() {
        File file = new File(System.getProperty("user.home") + "/storage/" + currentFileName);
        if(!file.exists()) {
            try {
                file.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return file;
    }

有了这个

 @Override
 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
 {
  System.out.println("inside FileChunkHandler current file: " + currentFileName);
  ByteBuffer buffer = byteBuf.nioBuffer();                  
  System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
              
  try(FileChannel channel=FileChannel.open(Path.of(System.getProperty("user.home"),"/storage/",currentFileName),StandardOpenOption.CREATE,StandardOpenOption.APPEND))
  {
   while(buffer.hasRemaining())
   {
    channel.write(buffer);
    System.out.println("chunk has just been written");
   }
  }

  //!!!--->IMPORTANT<-----!!!
  ctx.pipeline().remove(this);
 }

至于为什么你只能从服务器获得 8KB 的字节,我不确定。也许您的服务器重用了相同的 ByteBuffer,如果是这种情况,那么您必须在将其写入文件后清除它以使其 position=0 和 limit=buffer.capacity() 以便服务器可以使用新的 space 向其写入更多字节

而且由于您正在注册一个处理程序来接收新块,因此在您的文件达到预期大小时删除该处理程序可能是个好主意

把这两个放在一起最终的代码如下

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
{
 System.out.println("inside FileChunkHandler current file: " + currentFileName);
 ByteBuffer buffer = byteBuf.nioBuffer();                  
 System.out.println(buffer.capacity() + " buffer" + " bytebuf: " + byteBuf.readableBytes());
                  
 try(FileChannel channel=FileChannel.open(Path.of(System.getProperty("user.home"),"/storage/",currentFileName),StandardOpenOption.CREATE,StandardOpenOption.APPEND))
 {
  while(buffer.hasRemaining())
  {
   channel.write(buffer);
   System.out.println("chunk has just been written");
  }

  //clear the buffer so the server has space to transfer new chunks into it
  buffer.clear();
         
  //Remove handler only after file has reached an certain size
  if(channel.size()>=expectedLength){
   ctx.pipeline().remove(this);
  }
 }          
}

除了代码优化之外,其他的只是猜测,可能不是您要找的答案。如果有任何进展,请在下方评论