java nio:文件传输不完整

java nio: incomplete file transfer

我正在尝试使用 java NIO 将大型视频文件从客户端传输到服务器。我似乎需要使用 NIO,因为我要发送的文件比常规 IO 的明显文件大小限制大得多,大约 2GB……我的视频文件每个都大到 50GB。现在,我只是想构建一个小程序来理解这些概念。稍后将添加到更大的程序中。

我的问题在于服务器上只保存了文件的前几百 KB。每次我 运行 它时,服务器上都会保存不同的数据。谁能帮我解决问题? (以及您可能有的任何其他建议...NIO 对我来说是新手)谢谢!

工作原理如下:

客户端将有一组文件要发送到服务器。客户端将与服务器建立连接,服务器将回复说它已准备就绪。客户端发送文件头信息。服务器然后说它已准备好接收文件内容。然后客户端发送文件的内容。当文件完全传输后,它会重复下一个文件,直到不再需要发送文件为止。

主要客户

public static void main(String[] args) throws Throwable {
    FileSender fileSender = new FileSender("localhost", 7146);
    fileSender.addFileToSend(new File("C:\url\to\file1.jpg"));
    fileSender.addFileToSend(new File("C:\url\to\file2.jpg"));
    fileSender.addFileToSend(new File("C:\url\to\file3.jpg"));
    fileSender.sendFiles();
}

文件发送者

private static String serverAddress;
private static int port;
private static Charset charSet = Charset.forName(System.getProperty("file.encoding"));

private SocketChannel server = null;
private File file;
private RandomAccessFile aFile;
private FileChannel fileChannel;
private long filesize, transmittedSoFar;
private int current;
private ByteBuffer buffer = ByteBuffer.allocate(131072); //128k
private ByteBuffer responseBuffer;
private CharBuffer charBuffer;
private CharsetDecoder charDecoder = charSet.newDecoder();
private Selector selector;
private ArrayList<File> filesToSend = new ArrayList<>(0);
private int fileCountTracker = 0;

FileSender(String serverAddress, int port) {
    FileSender.serverAddress = serverAddress;
    FileSender.port = port;
}

public void sendFiles() {
    try {
        server = SocketChannel.open();
        server.connect(new InetSocketAddress(serverAddress, port));
        server.configureBlocking(false);
        System.out.println("Connected to Server");
        selector = Selector.open();
        server.register(selector, SelectionKey.OP_READ);
        waitForResponse();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

void waitForResponse() throws Exception {
    //TODO: track time. abort loop after 10 sec? 30 sec?
    while (true) {
        System.out.println("waiting for a response from server");
        selector.select();
        Set<SelectionKey> readyKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = readyKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = (SelectionKey) iterator.next();
            iterator.remove();
            if (key.isReadable()) {
                responseBuffer = ByteBuffer.allocate(16);
                server.read(responseBuffer);
                responseBuffer.flip();

                try {
                    charBuffer = charDecoder.decode(responseBuffer);
                    responseBuffer.clear();
                    String response = charBuffer.toString();
                    System.out.println(response);
                    if (response.startsWith("[readyForHeader]")) {
                        System.out.println("Received response: ready for header");
                        sendHeader();
                    }
                    else if (response.startsWith("[readyForBody]")) {
                        System.out.println("Received response: ready for body");
                        sendData();
                    }
                    else {
                        System.out.println("unknown response");
                        System.out.println(response);
                    }
                } catch(Exception e) {
                    System.out.println("error decoding file info");
                    System.out.println(e.getMessage());
                    return;
                }
            }
        }
    }
}

public void addFileToSend(File file) {
    filesToSend.add(file);
}

void sendHeader() {
    System.out.println("Tracker: "+fileCountTracker);
    try {
        if (filesToSend.size() > fileCountTracker) { //still more files to send
            System.out.println("a file exists at this array index");
            this.file = filesToSend.get(fileCountTracker);
            filesize = file.length();
            aFile = new RandomAccessFile(file, "r");
            transmittedSoFar = 0;

            //generate file info buffers to send to server
            byte[] fileInfoBytes = getFileMeta(file);
            ByteBuffer lengthBuffer = ByteBuffer.allocate(4); //length of file info
            lengthBuffer.putInt(0, fileInfoBytes.length);
            System.out.println("Source info length: "+fileInfoBytes.length);
            ByteBuffer infoBuffer = ByteBuffer.wrap(fileInfoBytes); //file info data

            //send file info buffers
            sendByteBuffer(lengthBuffer);
            sendByteBuffer(infoBuffer);
        } else {
            System.out.println("sending zero to indicate no more files");
            ByteBuffer lengthBuffer = ByteBuffer.allocate(4); //length of file info
            lengthBuffer.putInt(0, 0); //tell server sending zero bytes. server will end connection
            sendByteBuffer(lengthBuffer);
            terminate();
        }


    }
    catch (Exception e) {
        e.getMessage();
        terminate();
    }
}

void sendData() {
    try {
        fileChannel = aFile.getChannel();

        while ((current = fileChannel.read(buffer)) > 0 || buffer.position() > 0) {
            transmittedSoFar = transmittedSoFar + (long)current;
            System.out.println(Math.round(transmittedSoFar*100/filesize)+" "+transmittedSoFar);
            buffer.flip();
            server.write(buffer);
            buffer.compact();
        }
        System.out.println("End of file reached..");
        aFile.close();
    } catch (FileNotFoundException e) {
        System.out.println("FILE NOT FOUND EXCEPTION");
        e.getMessage();
    } catch (IOException e) {
        System.out.println("IO EXCEPTION");
        e.getMessage();
    }
    fileCountTracker++;
}

byte[] getFileMeta(File file) throws IOException {
    StringBuffer fileInfo = new StringBuffer();

    BasicFileAttributes attr = Files.readAttributes(file.toPath(), BasicFileAttributes.class);

    fileInfo.append(file.getName() + "\n");
    fileInfo.append(file.length() + "\n");
    fileInfo.append(attr.creationTime() + "\n");

    byte[] infoBytes = fileInfo.toString().getBytes();

    return infoBytes;
}

void sendByteBuffer(ByteBuffer bb) throws IOException {
    System.out.println("sending: "+bb.toString());
    server.write(bb);
    bb.rewind();
}

void terminate() {
    try {
        server.close();
        System.out.println("Connection closed");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

主服务器

public static void main(String[] args) throws Throwable {
    FileReceiver fileReceiver = new FileReceiver(7146);
    fileReceiver.initReceive();
}

文件接收器

static Charset charSet = Charset.forName(System.getProperty("file.encoding"));
static final Pattern pattern = Pattern.compile("[\n]");//new line
static int port;
static BytesTypeToReceive bytesType;

ServerSocketChannel server;
SocketChannel client;

ByteBuffer byteBuffer, responseBuffer;
CharBuffer charBuffer;
CharsetDecoder charDecoder = charSet.newDecoder();
RandomAccessFile aFile = null;
String fileInfo[];
int headerLength;
long remaining;
Selector selector;

public FileReceiver(int port) {
    FileReceiver.port = port;
}

public void initReceive() {
    try {
        server = ServerSocketChannel.open();
        server.configureBlocking(false);
        server.socket().bind(new InetSocketAddress(port));
        selector = Selector.open();
        server.register(selector, SelectionKey.OP_ACCEPT);
        waitForResponse();
    } catch (Exception e) {
        close();
        e.printStackTrace();
    }
}

void waitForResponse() throws Exception {
    while (true) {
        System.out.println("Waiting for data from client");
        int selCount = selector.select();
        System.out.println("selector count: "+selCount);
        Set<SelectionKey> readyKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = readyKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = (SelectionKey) iterator.next();
            iterator.remove();
            if (key.isReadable()) {
                if (bytesType == BytesTypeToReceive.HEADER) {
                    receiveHeader();
                } else {
                    receiveBody();
                }
            } else if (key.isAcceptable()) {
                client = server.accept();
                System.out.println("Connection established...." + client.getRemoteAddress());
                client.configureBlocking(false);
                bytesType = BytesTypeToReceive.HEADER;
                client.register(selector, SelectionKey.OP_READ);
                sendResponse("[readyForHeader]");
            }
        }
        Thread.sleep(250);
    }
}

private void receiveHeader() {
    System.out.println("Receiving header data");
    byteBuffer = ByteBuffer.allocate(4);

    try {
        //read length
        while (byteBuffer.remaining() > 0) client.read(byteBuffer);
        System.out.println("what is this? "+byteBuffer.toString());
        byteBuffer.rewind();
        System.out.println("and this? "+byteBuffer.toString());
        System.out.println("Info length is " + byteBuffer.getInt(0));

        if (byteBuffer.getInt(0) == 0) {
            System.out.println("no more files. end connection");
            throw new IOException();
        }

        //resize to size indicated in first buffer
        byteBuffer = ByteBuffer.allocate(byteBuffer.getInt(0));

        //read file info
        while (byteBuffer.remaining() > 0) client.read(byteBuffer);
        byteBuffer.flip();

        //decode file info
        try {
            charBuffer = charDecoder.decode(byteBuffer);
            byteBuffer.clear();
             System.out.println(charBuffer.toString());
        } catch(Exception e) {
            System.out.println("error decoding file info");
            return;
        }
        fileInfo = pattern.split(charBuffer);

        System.out.println("info0: "+fileInfo[0]);
        System.out.println("info1: "+fileInfo[1]);

        remaining = Long.parseLong(fileInfo[1]);

        bytesType = BytesTypeToReceive.BODY;
        //tell client ready for file data
        sendResponse("[readyForBody]");
    } catch (Exception e) {
        System.out.println("Exception for checkForData. No more data?");
        System.out.println(e.getMessage());
    }
}

/**
 * Reads the bytes from socket and writes to file
 *
 * @param socketChannel
 */
//private void readFileFromSocket(SocketChannel socketChannel, int infoLength) {
private void receiveBody() throws Exception {
    int current;
    System.out.println("About to receive "+remaining+" bytes.");
    try {
        //read file data
        aFile = new RandomAccessFile("C:\folder\to\save\to\"+fileInfo[0], "rw");
        byteBuffer = ByteBuffer.allocate(131072);
        FileChannel fileChannel = aFile.getChannel();
        while (((current = client.read(byteBuffer)) > 0 || byteBuffer.position() > 0) && remaining > 0) {
            remaining = remaining - (long)current;
            System.out.println(current+" "+remaining);  
            byteBuffer.flip();
            fileChannel.write(byteBuffer);
            byteBuffer.compact();
        }
        fileChannel.close();
        aFile.close();      
        System.out.println(current +" - End of file");
        bytesType = BytesTypeToReceive.HEADER;
        sendResponse("[readyForHeader]");
    } catch (FileNotFoundException e) {
        System.out.println("FILE NOT FOUND EXCEPTION");
        e.getMessage();
    } catch (IOException e) {
        System.out.println("IO EXCEPTION");
        e.getMessage();
    } catch (InterruptedException e) {
        System.out.println("INTERRUPTED EXCEPTION");
        e.getMessage();
    }
}
void sendResponse(String response) throws Exception {
    System.out.println("Sending response: "+response);
    byte[] data = response.getBytes("UTF-8");
    responseBuffer = ByteBuffer.wrap(data);
    client.write(responseBuffer);
    responseBuffer.rewind();

}
public void close() {
    try {
        client.close();
        server.close();
        System.out.println("connection closed");
    } catch (IOException e) {
        e.printStackTrace();
    }

}

此处唯一的文件大小限制是您自己的代码。您以 4 字节的整数形式发送文件大小。

使用长。

注意您不需要分配巨大的缓冲区。您甚至不需要发送文件大小,除了检查。您可以使用 32-64k 数量级的缓冲区大小。你抄代码就好了

这可能是问题所在:

while (((current = client.read(byteBuffer)) > 0

由于您的套接字配置为非阻塞,并且您在输入上没有任何选择,它将快速消耗传入数据并停止读取 return -1 或 0。

实际上客户端有类似的问题,但它会在尝试将数据推送到过载的套接字时烧毁 CPU。

问题出在这里:

server.configureBlocking(假);

服务器应该处于阻塞模式,因为当套接字仍在从缓冲区读取数据时,您不应该将数据写入缓冲区。