读取包含二进制格式文件的大文件并以最小堆分配提取这些文件

Reading large files containing files in binary format and extracting those files with minimum heap allocation

对不起标题,它可能有点令人困惑,但我不知道如何更好地解释它。

有两个扩展名为 .cat(目录文件)和 .dat 的文件。 .cat 文件包含 .dat 文件中二进制文件的信息。此信息是文件名、大小、.dat 文件中的偏移量和 md5 哈希值。

.cat 文件示例;

assets/textures/environments/asteroids/ast_crystal_blue_diff-small.gz 22387 1546955265 85a67a982194e4141e08fac4bf062c8f
assets/textures/environments/asteroids/ast_crystal_blue_diff.gz 83859 1546955265 86c7e940de82c2c2573a822c9efc9b6b
assets/textures/environments/asteroids/ast_crystal_diff-small.gz 22693 1546955265 cff6956c94b59e946b78419d9c90f972
assets/textures/environments/asteroids/ast_crystal_diff.gz 85531 1546955265 57d5a24dd4da673a42cbf0a3e8e08398
assets/textures/environments/asteroids/ast_crystal_green_diff-small.gz 22312 1546955265 857fea639e1af42282b015e8decb02db
assets/textures/environments/asteroids/ast_crystal_green_diff.gz 115569 1546955265 ee6f60b0a8211ec048172caa762d8a1a
assets/textures/environments/asteroids/ast_crystal_purple_diff-small.gz 14179 1546955265 632317951273252d516d36b80de7dfcd
assets/textures/environments/asteroids/ast_crystal_purple_diff.gz 53781 1546955265 c057acc06a4953ce6ea3c6588bbad743
assets/textures/environments/asteroids/ast_crystal_yellow_diff-small.gz 21966 1546955265 a893c12e696f9e5fb188409630b8d10b
assets/textures/environments/asteroids/ast_crystal_yellow_diff.gz 82471 1546955265 c50a5e59093fe9c6abb64f0f47a26e57
assets/textures/environments/asteroids/xen_crystal_diff-small.gz 14161 1546955265 23b34bdd1900a7e61a94751ae798e934
assets/textures/environments/asteroids/xen_crystal_diff.gz 53748 1546955265 dcb7c8294ef72137e7bca8dd8ea2525f
assets/textures/lensflares/lens_rays3_small_diff.gz 14107 1546955265 a656d1fad4198b0662a783919feb91a5

我确实相对轻松地解析了这些文件,并且我使用了 Span<T>,并且在使用 BenchmarkDotNet 进行了一些基准测试之后,我相信我已经尽可能地优化了这些类型文件的读取。

但是 .dat 文件是另一回事。典型的 .dat 文件大小为 GB。

我首先尝试了我能想到的最直接的方法。

(我删除了空检查和验证代码以使代码更具可读性。)

public async Task ExportAssetsAsync(CatalogFile catalogFile, string destDirectory, CancellationToken ct = default)
{
    IFileInfo catalogFileInfo = _fs.FileInfo.FromFileName(catalogFile.FilePath);
    string catalogFileName = _fs.Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = _fs.Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");
    IFileInfo datFileInfo = _fs.FileInfo.FromFileName(datFilePath);

    await using Stream stream = datFileInfo.OpenRead();
    
    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = _fs.Path.Combine(destDirectory, catalogEntry.AssetPath);
        IFileInfo destFile = _fs.FileInfo.FromFileName(destFilePath);
        if (!destFile.Directory.Exists)
        {
            destFile.Directory.Create();
        }
        stream.Seek(catalogEntry.ByteOffset, SeekOrigin.Begin);
        var newFileData = new byte[catalogEntry.AssetSize];
        int read = await stream.ReadAsync(newFileData, 0, catalogEntry.AssetSize, ct);
        if (read != catalogEntry.AssetSize)
        {
            _logger?.LogError("Could not read asset data from dat file: {DatFile}", datFilePath);
            throw new DatFileReadException("Could not read asset data from dat file", datFilePath);
        }
        await using Stream destStream = _fs.File.Open(destFile.FullName, FileMode.Create);
        destStream.Write(newFileData);
        destStream.Close();
    }
}

正如您所猜到的那样,这种方法既慢又在堆中分配大量资源,这让 GC 一直很忙。

我对上面的方法做了一些修改,尝试使用缓冲区读取,然后使用 stackalloc 和 Span 而不是使用 new byte[catalogEntry.AssetSize] 进行分配。我在缓冲读取中没有获得太多,自然地,我得到了 stackalloc 的 Whosebug 异常,因为有些文件大于堆栈大小。

然后经过一番研究,我决定可以使用 .NET Core 2.1 引入的 System.IO.Pipelines。我把上面的方法改成下面这样。

public async Task ExportAssetsPipe(CatalogFile catalogFile, string destDirectory, CancellationToken ct = default)
{
    IFileInfo catalogFileInfo = _fs.FileInfo.FromFileName(catalogFile.FilePath);
    string catalogFileName = _fs.Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = _fs.Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");

    IFileInfo datFileInfo = _fs.FileInfo.FromFileName(datFilePath);
    
    await using Stream stream = datFileInfo.OpenRead();

    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = _fs.Path.Combine(destDirectory, catalogEntry.AssetPath);
        IFileInfo destFile = _fs.FileInfo.FromFileName(destFilePath);
        if (!destFile.Directory.Exists)
        {
            destFile.Directory.Create();
        }
        stream.Position = catalogEntry.ByteOffset;
        var reader = PipeReader.Create(stream);
        while (true)
        {
            ReadResult readResult = await reader.ReadAsync(ct);
            ReadOnlySequence<byte> buffer = readResult.Buffer;
            if (buffer.Length >= catalogEntry.AssetSize)
            {
                ReadOnlySequence<byte> entry = buffer.Slice(0, catalogEntry.AssetSize);
                await using Stream destStream = File.Open(destFile.FullName, FileMode.Create);
                foreach (ReadOnlyMemory<byte> mem in entry)
                {
                   await destStream.WriteAsync(mem, ct);
                }
                destStream.Close();
                break;
            }
            reader.AdvanceTo(buffer.Start, buffer.End);
        }
    }
}

根据 BenchmarkDotnet,结果在性能和内存分配方面都比第一种方法差。这可能是因为我错误地或无意地使用了 System.IO.Pipelines。

我没有太多这方面的经验,因为我以前没有对这么大的文件进行过 I/O 操作。我怎样才能用最少的内存分配和最大的性能做我想做的事?非常感谢您的帮助和正确指导。

使用 System.Buffers 上的新 ArrayPool,首先研究如何使用它来避免内存泄漏。

您需要始终从 return 租用池,这对内存分配有很大帮助。 –

试试这个 link adamsitnik。com/Array-Pool 进行研究

首先,感谢 Mauricio Atanache 和 Alexei Levenkov 的建议。在尝试他们建议的方法时,我学到了很多东西。在完成基准测试后,我决定继续使用 Alexei Levenkov 建议的 SubStream 和 Stream.CopyTo 方法。

首先我想分享解决方案。之后,有好奇心的可以查看基准和结果。

解决方案

Alexei 向我指出了一个老问题,我查看了那里的解决方案并将其改编为我自己的代码。

How to expose a sub section of my stream to a user

首先,我需要一个 SubStream 实现,基本上我想做的是从一个大的 .dat 文件中提取小文件。通过使用 SubStream,我可以将文件封装在我想要的 FileStream 偏移量处。然后,使用Stream.Copy方法,我可以将SubStream中的内容复制到另一个FileStream中,写入文件系统。使用这种方法,我只进行一次缓冲区分配。

public class SubStream : Stream
{
    private readonly Stream _baseStream;
    private readonly long _length;
    private long _position;

    public SubStream(Stream baseStream, long offset, long length)
    {
        if (baseStream == null)
        {
            throw new ArgumentNullException(nameof(baseStream), "Base stream cannot be null");
        }

        if (!baseStream.CanRead)
        {
            throw new ArgumentException("Base stream must be readable.", nameof(baseStream));
        }

        if (offset < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(offset));
        }

        _baseStream = baseStream;
        _length = length;

        if (baseStream.CanSeek)
        {
            baseStream.Seek(offset, SeekOrigin.Current);
        }
        else
        {
            // read it manually...
            const int bufferSize = 512;
            var buffer = new byte[bufferSize];
            while (offset > 0)
            {
                int read = baseStream.Read(buffer, 0, offset < bufferSize ? (int)offset : bufferSize);
                offset -= read;
            }
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        CheckDisposed();
        long remaining = _length - _position;
        if (remaining <= 0)
        {
            return 0;
        }

        if (remaining < count)
        {
            count = (int)remaining;
        }
        
        int read = _baseStream.Read(buffer, offset, count);
        _position += read;
        
        return read;
    }

    private void CheckDisposed()
    {
        if (_baseStream == null)
        {
            throw new ObjectDisposedException(GetType().Name);
        }
    }

    public override long Length
    {
        get
        {
            CheckDisposed();
            return _length;
        }
    }

    public override bool CanRead
    {
        get
        {
            CheckDisposed();
            return true;
        }
    }

    public override bool CanWrite
    {
        get
        {
            CheckDisposed();
            return false;
        }
    }

    public override bool CanSeek
    {
        get
        {
            CheckDisposed();
            return false;
        }
    }

    public override long Position
    {
        get
        {
            CheckDisposed();
            return _position;
        }
        set => throw new NotSupportedException();
    }

    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

    public override void SetLength(long value) => throw new NotSupportedException();

    public override void Write(byte[] buffer, int offset, int count) => throw new NotImplementedException();

    public override void Flush()
    {
        CheckDisposed();
        _baseStream.Flush();
    }
}

最终版本的方法如下

private static void ExportAssets(CatalogFile catalogFile, string destDirectory)
{
    FileInfo catalogFileInfo = new FileInfo(catalogFile.FilePath);
    string catalogFileName = Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");
    FileInfo datFileInfo = new FileInfo(datFilePath);

    using Stream stream = datFileInfo.OpenRead();
    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = Path.Combine(destDirectory, catalogEntry.AssetPath);
        FileInfo destFile = new FileInfo(destFilePath);

        if (!destFile.Directory.Exists)
        {
            destFile.Directory.Create();
        }

        using var subStream = new SubStream(stream, catalogEntry.ByteOffset, catalogEntry.AssetSize);
        using Stream destStream = File.Open(destFile.FullName, FileMode.Create);
        subStream.CopyTo(destStream);
        destStream.Close();
    }
}

基准设置

我在做基准测试时使用的设置

  • 我使用了两个单独的 .dat 文件,一个 600KB,另一个 550MB。
  • 在基准测试中,对文件系统的写入操作导致结果波动。相反,我使用 MemoryStream 来模拟写入操作。
  • 我在基准测试中包含了同步和异步版本的方法。
  • 我正在使用 System.IO.Abstractions 库模拟单元测试的文件 IO 操作。不要被以 Fs. 开头的方法调用混淆(例如 Fs.FileInfo.FromFileName(catalogFile.FilePath))。

三个不同版本的方法用于基准测试。

第一个是未优化的版本,它为 .dat 文件中的每个子文件分配 new byte[]

private static void ExportAssetsUnoptimized(CatalogFile catalogFile, string destDirectory)
{
    IFileInfo catalogFileInfo = Fs.FileInfo.FromFileName(catalogFile.FilePath);
    string catalogFileName = Fs.Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = Fs.Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");
    IFileInfo datFileInfo = Fs.FileInfo.FromFileName(datFilePath);

    using Stream stream = datFileInfo.OpenRead();

    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = Fs.Path.Combine(destDirectory, catalogEntry.AssetPath);
        IFileInfo destFile = Fs.FileInfo.FromFileName(destFilePath);

        if (!destFile.Directory.Exists)
        {
            // destFile.Directory.Create();
        }

        stream.Seek(catalogEntry.ByteOffset, SeekOrigin.Begin);
        var newFileData = new byte[catalogEntry.AssetSize];
        int read = stream.Read(newFileData, 0, catalogEntry.AssetSize);

        if (read != catalogEntry.AssetSize)
        {
            throw new DatFileReadException("Could not read asset data from dat file", datFilePath);
        }

        // using Stream destStream = Fs.File.Open(destFile.FullName, FileMode.Create);
        using var destStream = new MemoryStream();
        destStream.Write(newFileData);
        destStream.Close();
    }
}

第二个是System.Buffer中的ArrayPool(由Mauricio Atanache建议)。 ArrayPool<T> 是一个 high-performance 托管数组池。您可以在 System.Buffers 包中找到它,其源代码可在 GitHub 上找到。它已经成熟,可以在生产中使用了。

有一篇很好的文章详细解释了这个主题。

Pooling large arrays with ArrayPool

我仍然怀疑我没有正确使用它或未达到预期目的。但是当我如下使用它时,我观察到它比上面未优化的版本工作得更快并且分配了一半。

private static void ExportAssetsWithArrayPool(CatalogFile catalogFile, string destDirectory)
{
    IFileInfo catalogFileInfo = Fs.FileInfo.FromFileName(catalogFile.FilePath);
    string catalogFileName = Fs.Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = Fs.Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");
    IFileInfo datFileInfo = Fs.FileInfo.FromFileName(datFilePath);

    ArrayPool<byte> bufferPool = ArrayPool<byte>.Shared;

    using Stream stream = datFileInfo.OpenRead();
    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = Fs.Path.Combine(destDirectory, catalogEntry.AssetPath);
        IFileInfo destFile = Fs.FileInfo.FromFileName(destFilePath);

        if (!destFile.Directory.Exists)
        {
            //destFile.Directory.Create();
        }

        stream.Seek(catalogEntry.ByteOffset, SeekOrigin.Begin);
        byte[] newFileData = bufferPool.Rent(catalogEntry.AssetSize);
        int read = stream.Read(newFileData, 0, catalogEntry.AssetSize);

        if (read != catalogEntry.AssetSize)
        {
            throw new DatFileReadException("Could not read asset data from dat file", datFilePath);
        }

        // using Stream destStream = Fs.File.Open(destFile.FullName, FileMode.Create);
        using Stream destStream = new MemoryStream();
        destStream.Write(newFileData, 0, catalogEntry.AssetSize);
        destStream.Close();
        bufferPool.Return(newFileData);
    }
}

而第三个是最快和最少的 memory-allocated 版本。第三个是最快和最少的 memory-allocated 版本。至少 memory-allocated 我的意思是分配的内存少了约 75 倍,而且速度明显更快。

我在回答开头已经给出了该方法的代码示例并进行了说明。所以,我要跳到基准测试结果。

您可以从下面的要点 link 访问完整的 Benchmarkdotnet 设置。

https://gist.github.com/Blind-Striker/8f7e8ff56de6d9c2a4ab7a47ae423eba

基准测试结果

Method FileSize Mean Error StdDev Gen 0 Gen 1 Gen 2 Allocated
ExportAssetsUnoptimized_Benchmark Large_5GB 563,034.4 us 13,290.13 us 38,977.64 us 140000.0000 140000.0000 140000.0000 1,110,966 KB
ExportAssetsWithArrayPool_Benchmark Large_5GB 270,394.1 us 5,308.29 us 6,319.15 us 5500.0000 4000.0000 4000.0000 555,960 KB
ExportAssetsSubStream_Benchmark Large_5GB 17,525.8 us 183.55 us 171.69 us 3468.7500 3468.7500 3468.7500 14,494 KB
ExportAssetsUnoptimizedAsync_Benchmark Large_5GB 574,430.4 us 20,442.46 us 59,954.20 us 133000.0000 133000.0000 133000.0000 1,111,298 KB
ExportAssetsWithArrayPoolAsync_Benchmark Large_5GB 237,256.6 us 5,673.63 us 16,728.82 us 1500.0000 - - 556,088 KB
ExportAssetsSubStreamAsync_Benchmark Large_5GB 32,766.5 us 636.08 us 732.51 us 3187.5000 2562.5000 2562.5000 15,186 KB
ExportAssetsUnoptimized_Benchmark Small_600KB 680.4 us 13.24 us 23.20 us 166.0156 124.0234 124.0234 1,198 KB
ExportAssetsWithArrayPool_Benchmark Small_600KB 497.9 us 7.54 us 7.06 us 124.5117 62.0117 62.0117 605 KB
ExportAssetsSubStream_Benchmark Small_600KB 332.0 us 4.87 us 4.32 us 26.8555 26.8555 26.8555 223 KB
ExportAssetsUnoptimizedAsync_Benchmark Small_600KB 739.2 us 5.98 us 5.30 us 186.5234 124.0234 124.0234 1,200 KB
ExportAssetsWithArrayPoolAsync_Benchmark Small_600KB 604.9 us 6.99 us 6.54 us 124.0234 61.5234 61.5234 607 KB
ExportAssetsSubStreamAsync_Benchmark Small_600KB 496.6 us 8.02 us 6.70 us 26.8555 26.8555 26.8555 228 KB

结论与免责声明

我得出的结论是 SubStream 和 Stream.CopyTo 方法分配的内存少得多,运行速度快得多。可能有些分配是因为 Path.Combine.

不过我想提醒您,在我将这个问题发布到 Whosebug 上之前,我并没有使用过 ArrayPool。有可能我没有正确使用它或未达到预期目的。我也不确定使用 MemoryStream 而不是 FileStream 作为写入目标以保持基准一致的准确性。