写入具有多个流的文件 C#

Writing To A File With Multiple Streams C#

我正在尝试使用 HTTP 将一个大文件 (>1GB) 从一台服务器下载到另一台服务器。为此,我并行发出 HTTP 范围请求。这让我可以并行下载文件。

当保存到磁盘时,我正在获取每个响应流,打开同一个文件作为文件流,寻找我想要的范围然后写入。

但是我发现除了一个响应流外,所有响应流都超时了。 看起来 磁盘I/O 跟不上网络I/O。但是,如果我做同样的事情,但让每个线程写入一个单独的文件,它就可以正常工作。

作为参考,这是我写入同一文件的代码:

int numberOfStreams = 4;
List<Tuple<int, int>> ranges = new List<Tuple<int, int>>();
string fileName = @"C:\MyCoolFile.txt";
//List populated here
Parallel.For(0, numberOfStreams, (index, state) =>
{
    try
    {
        HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create("Some URL");
        using(Stream responseStream = webRequest.GetResponse().GetResponseStream())
        {
            using (FileStream fileStream = File.Open(fileName, FileMode.OpenOrCreate, FileAccess.Write, FileShare.Write))
            {
                fileStream.Seek(ranges[index].Item1, SeekOrigin.Begin);
                byte[] buffer = new byte[64 * 1024];
                int bytesRead;
                while ((bytesRead = responseStream.Read(buffer, 0, buffer.Length)) > 0)
                {
                    if (state.IsStopped)
                    {
                        return;
                    }
                    fileStream.Write(buffer, 0, bytesRead);
                }
            }
        };
    }
    catch (Exception e)
    {
        exception = e;
        state.Stop();
    }
});

这里是写入多个文件的代码:

int numberOfStreams = 4;
List<Tuple<int, int>> ranges = new List<Tuple<int, int>>();
string fileName = @"C:\MyCoolFile.txt";
//List populated here
Parallel.For(0, numberOfStreams, (index, state) =>
{
    try
    {
        HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create("Some URL");
        using(Stream responseStream = webRequest.GetResponse().GetResponseStream())
        {
            using (FileStream fileStream = File.Open(fileName + "." + index + ".tmp", FileMode.OpenOrCreate, FileAccess.Write, FileShare.Write))
            {
                fileStream.Seek(ranges[index].Item1, SeekOrigin.Begin);
                byte[] buffer = new byte[64 * 1024];
                int bytesRead;
                while ((bytesRead = responseStream.Read(buffer, 0, buffer.Length)) > 0)
                {
                    if (state.IsStopped)
                    {
                        return;
                    }
                    fileStream.Write(buffer, 0, bytesRead);
                }
            }
        };
    }
    catch (Exception e)
    {
        exception = e;
        state.Stop();
    }
});

我的问题是,当从多个线程写入单个文件时,C#/Windows 是否有一些额外的 checks/actions 会导致文件 I/O 变慢比写入多个文件时?所有磁盘操作都应该受磁盘速度的约束,对吗?谁能解释这种行为?

提前致谢!

更新:这是 源服务器 抛出的错误:

"Unable to write data to the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond." [System.IO.IOException]:"Unable to write data to the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond." 内部异常:"A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond" 消息:"Unable to write data to the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond." StackTrace:“在 System.Net.Sockets.NetworkStream.Write(字节 [] 缓冲区,Int32 偏移量,Int32 大小)\r\n 在 System.Net.Security._SslStream.StartWriting(字节 [] 缓冲区,Int32 偏移量,Int32 计数,AsyncProtocolRequest asyncRequest)\r\n 在 System.Net.Security._SslStream.ProcessWrite(字节 [] 缓冲区,Int32 偏移量,Int32 计数,AsyncProtocolRequest asyncRequest)\r\n 在 System.Net.Security.SslStream.Write(字节 [] 缓冲区,Int32 偏移量,Int32 计数)\r\n

除非您正在写入条带化 RAID,否则您不太可能通过同时从多个线程写入文件来体验性能优势。事实上,它更有可能是相反的——并发写入会交错并导致随机访问,导致磁盘寻道延迟,使它们比大型顺序写入慢几个数量级。

要获得透视感,请看一些 latency comparisons。从磁盘连续读取 1 MB 需要 20 毫秒;写入大约需要相同的时间。另一方面,每次磁盘寻道大约需要 10 毫秒。如果您的写入以 4 KB 块交错,那么您的 1 MB 写入将需要额外的 2560 毫秒寻道时间,使其比顺序写入慢 100 倍。

我建议在任何时候只允许一个线程写入文件,并只为网络传输使用并行性。您可以使用生产者-消费者模式,其中下载的块被写入有界并发集合(例如 BlockingCollection<T>),然后由专用线程获取并写入磁盘。

根据目前提供的信息,这是我的猜测:

在 Windows 上,当您写入扩展文件大小的位置时 Windows 需要零初始化它之前的所有内容。这可以防止旧磁盘数据泄漏,这将是一个安全问题。

可能,除了您的第一个线程之外,所有线程都需要对如此多的数据进行零初始化,以致于下载超时。这不再是真正的流式传输,因为第一次写入需要很长时间。

如果您拥有 LPIM 权限,则可以避免零初始化。否则你不能出于安全原因。免费下载管理器显示一条消息,表明它在每次下载开始时开始零初始化。

    fileStream.Seek(ranges[index].Item1, SeekOrigin.Begin);

Seek() 调用有问题,您将查找文件中距当前文件末尾很远的部分。您的下一个 fileStream.Write() 调用会强制文件系统扩展磁盘上的文件,用零填充未写入的部分。

这可能需要一段时间,您的线程将被阻塞,直到文件系统完成文件扩展。可能足够长以触发超时。你会在 t运行sfer 开始时看到这个问题。

解决方法是创建填充整个文件,然后开始写入真实数据。否则下载器使用的一种非常常见的策略,您之前可能已经看到过 .part 文件。另一个好处是你有一个不错的 gua运行 tee,t运行sfer 不会因为磁盘 运行 out of space 而失败。请注意,只有当机器有足够的 RAM 时,用零填充文件才便宜。 1 GB 在现代机器上应该不是问题。

复制代码:

using System;
using System.IO;
using System.Diagnostics;

class Program {
    static void Main(string[] args) {
        string path = @"c:\temp\test.bin";
        var fs = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.Write);
        fs.Seek(1024L * 1024 * 1024, SeekOrigin.Begin);
        var buf = new byte[4096];
        var sw = Stopwatch.StartNew();
        fs.Write(buf, 0, buf.Length);
        sw.Stop();
        Console.WriteLine("Writing 4096 bytes took {0} milliseconds", sw.ElapsedMilliseconds);
        Console.ReadKey();
        fs.Close();
        File.Delete(path);
    }
}

输出:

Writing 4096 bytes took 1491 milliseconds

那是在快速 SSD 上,主轴驱动器将花费 很多 的时间。

System.Net.Sockets.NetworkStream.Write

堆栈跟踪显示错误发生在写入服务器时。这是一个超时。这可能是因为

  1. 网络failure/overloading
  2. 服务器无响应。

这不是写入文件的问题。分析网络和服务器。可能服务器还没有准备好并发使用。

通过禁用写入文件来证明这个理论。错误应该仍然存在。

因此,在尝试了所有建议后,我最终使用 MemoryMappedFile 并打开一个流以在每个线程上写入 MemoryMappedFile

int numberOfStreams = 4;
List<Tuple<int, int>> ranges = new List<Tuple<int, int>>();
string fileName = @"C:\MyCoolFile.txt";
//Ranges list populated here
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(fileName, FileMode.OpenOrCreate, null, fileSize.Value, MemoryMappedFileAccess.ReadWrite))
{
    Parallel.For(0, numberOfStreams, index =>
    {
        try
        {
            HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create("Some URL");
            using(Stream responseStream = webRequest.GetResponse().GetResponseStream())
            {
                using (MemoryMappedViewStream fileStream = mmf.CreateViewStream(ranges[index].Item1, ranges[index].Item2 - ranges[index].Item1 + 1, MemoryMappedFileAccess.Write))
                {
                    responseStream.CopyTo(fileStream);
                }
            };
        }
        catch (Exception e)
        {
            exception = e;
        }
    });
}