在 Java 中读取文件的前 N ​​个字节作为 InputStream?

Reading first N bytes of a file as an InputStream in Java?

我这辈子都找不到与我正在尝试做的问题相匹配的问题,所以我将在这里解释我的用例。如果您知道某个主题已经涵盖了此问题的答案,请随时将我引导至该主题。 :)

我有一段代码可以定期(每 20 秒)将文件上传到 Amazon S3。该文件是一个正在被另一个进程写入的日志文件,所以这个函数实际上是一种跟踪日志的方法,这样某人就可以半实时地读取它的内容,而不必直接访问日志所在的机器.

直到最近,我一直在使用 S3 PutObject 方法(使用文件作为输入)来执行此上传。但在 AWS SDK 1.9 中,这不再有效,因为如果实际上传的内容大小大于上传开始时承诺的内容长度,S3 客户端将拒绝请求。此方法在开始流式传输数据之前读取文件的大小,因此鉴于此应用程序的性质,文件的大小很可能在该点和流的末尾之间增加。这意味着我现在需要确保无论文件有多大我只发送 N 个字节的数据。

我不需要以任何方式解释文件中的字节,所以我不关心编码。我可以逐字节传输它。基本上,我想要的是一个简单的方法,我可以读取文件直到第 N 个字节,然后终止读取,即使文件中的数据超过了该点。 (换句话说,在特定点将 EOF 插入流中。)

例如,如果我的文件在开始上传时为 10000 字节长,但在上传过程中增长到 12000 字节,我想在达到 10000 字节时停止上传,无论大小如何变化。 (在后续上传时,我会上传 12000 字节或更多。)

我还没有找到一个预制的方法来做到这一点 - 到目前为止我发现的最好的方法似乎是 IOUtils.copyLarge(InputStream, OutputStream, offset, length),它可以被告知将最多 "length" 字节复制到提供的 OutputStream。但是,copyLarge 是一种阻塞方法,PutObject 也是(它大概在其 InputStream 上调用了一种形式的 read()),所以我似乎根本无法让它工作。

我还没有找到可以执行此操作的任何方法或预构建流,所以这让我觉得我需要编写自己的实现来直接监控已读取的字节数。这可能会像 BufferedInputStream 一样工作,其中每批读取的字节数是缓冲区大小或要读取的剩余字节中的较小者。 (例如,缓冲区大小为 3000 字节,我会做三批,每批 3000 字节,然后是 1000 字节 + EOF 的批。)

有谁知道更好的方法吗?谢谢

编辑 澄清一下,我已经知道几个替代方案,但都不是理想的选择:

(1) 我可以在上传文件时锁定文件。这样做会导致写入文件的过程中出现数据丢失或操作问题。

(2) 我可以在上传文件之前创建文件的本地副本。这可能是非常低效的,并且会占用很多不必要的磁盘 space(这个文件可以增长到几千兆字节的范围,而它 运行 所在的机器可能磁盘不足 space).

编辑 2: 根据同事的建议,我的最终解决方案如下所示:

private void uploadLogFile(final File logFile) {
    if (logFile.exists()) {
        long byteLength = logFile.length();
        try (
            FileInputStream fileStream = new FileInputStream(logFile);
            InputStream limitStream = ByteStreams.limit(fileStream, byteLength);
        ) {
            ObjectMetadata md = new ObjectMetadata();
            md.setContentLength(byteLength);
            // Set other metadata as appropriate.
            PutObjectRequest req = new PutObjectRequest(bucket, key, limitStream, md);
            s3Client.putObject(req);
        } // plus exception handling
    }
}

LimitInputStream 是我的同事建议的,显然不知道它已被弃用。 ByteStreams.limit 是当前的 Guava 替代品,它可以满足我的要求。谢谢大家。

完整答案撕换:

包装一个 InputStream 相对简单,例如在发出数据结束信号之前限制它将传送的字节数。 FilterInputStream 是针对这种一般类型的工作,但由于您必须为这种 特定的 工作覆盖几乎所有方法,所以它只会妨碍。

这里是粗略的解决方案:

import java.io.IOException;
import java.io.InputStream;

/**
 * An {@code InputStream} wrapper that provides up to a maximum number of
 * bytes from the underlying stream.  Does not support mark/reset, even
 * when the wrapped stream does, and does not perform any buffering.
 */
public class BoundedInputStream extends InputStream {

    /** This stream's underlying @{code InputStream} */
    private final InputStream data;

    /** The maximum number of bytes still available from this stream */ 
    private long bytesRemaining;

    /**
     * Initializes a new {@code BoundedInputStream} with the specified
     * underlying stream and byte limit
     * @param data the @{code InputStream} serving as the source of this
     *        one's data
     * @param maxBytes the maximum number of bytes this stream will deliver
     *        before signaling end-of-data
     */
    public BoundedInputStream(InputStream data, long maxBytes) {
        this.data = data;
        bytesRemaining = Math.max(maxBytes, 0);
    }

    @Override
    public int available() throws IOException {
        return (int) Math.min(data.available(), bytesRemaining);
    }

    @Override
    public void close() throws IOException {
        data.close();
    }

    @Override
    public synchronized void mark(int limit) {
        // does nothing
    }

    @Override
    public boolean markSupported() {
        return false;
    }

    @Override
    public int read(byte[] buf, int off, int len) throws IOException {
        if (bytesRemaining > 0) {
            int nRead = data.read(
                    buf, off, (int) Math.min(len, bytesRemaining));

            bytesRemaining -= nRead;

            return nRead;
        } else {
            return -1;
        }
    }

    @Override
    public int read(byte[] buf) throws IOException {
        return this.read(buf, 0, buf.length);
    }

    @Override
    public synchronized void reset() throws IOException {
        throw new IOException("reset() not supported");
    }

    @Override
    public long skip(long n) throws IOException {
        long skipped = data.skip(Math.min(n, bytesRemaining));

        bytesRemaining -= skipped;

        return skipped;
    }

    @Override
    public int read() throws IOException {
        if (bytesRemaining > 0) {
            int c = data.read();

            if (c >= 0) {
                bytesRemaining -= 1;
            }

            return c;
        } else {
            return -1;
        }
    }
}