如何定期将 c# FileStream 刷新到磁盘?

How to periodically flush c# FileStream to the disk?

上下文

我正在为一个 Web API 项目实现一个日志记录机制,该项目通过多种方法将序列化对象写入文件,然后由外部进程读取(nxLog 更准确)。该应用程序托管在 IIS 上并使用 18 个工作进程。应用程序池每天回收一次。包含日志记录方法的服务的预期负载为 10,000 req/s。简而言之,这是一个 classic produces/consumer 问题,涉及多个生产者(生成日志的方法)和一个消费者(从日志文件读取的外部进程)。 更新:每个进程也使用多线程。

我使用 BlockingCollection 存储数据(并解决竞争条件)和一个长 运行 任务,将集合中的数据写入磁盘。

要写入磁盘,我使用 StreamWriterFileStream
因为写入频率几乎是恒定的(正如我所说的每秒 10,000 次写入),我决定在应用程序池的整个生命周期内保持流打开,并定期将日志写入磁盘。我每天依靠应用程序池回收和我的 DI 框架来处理我的记录器。另请注意,此 class 将是单例,因为我不希望有超过一个线程专用于从我的线程池中写入。

显然,FileStream 对象在被释放之前不会写入磁盘。现在我不希望 FileStream 在写入磁盘之前等待一整天。保存所有序列化对象所需的内存将是巨大的,更不用说应用程序或服务器上的任何崩溃都会导致数据丢失或文件损坏。

现在我的问题

如何让底层流(FileStream 和 StreamWriter)定期写入磁盘而不释放它们?我最初的假设是,一旦 FileSteam 超过其缓冲区大小(默认情况下为 4K),它将写入磁盘。

更新: 答案中提到的不一致已经修复。

代码:

public class EventLogger: IDisposable, ILogger
{
    private readonly BlockingCollection<List<string>> _queue;
    private readonly Task _consumerTask;
    private FileStream _fs;
    private StreamWriter _sw;
    public EventLogger()
    {            
        OpenFile();
        _queue = new BlockingCollection<List<string>>(50);
        _consumerTask = Task.Factory.StartNew(Write, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }
    private void OpenFile()
    {
        _fs?.Dispose();
        _sw?.Dispose();            
        _logFilePath = $"D:\Log\log{DateTime.Now.ToString(yyyyMMdd)}{System.Diagnostic.Process.GetCurrentProcess().Id}.txt";
        _fs = new FileStream(_logFilePath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
        _sw = new StreamWriter(_fs);
    }
    public void Dispose()
    {            
         _queue?.CompleteAdding();
         _consumerTask?.Wait();            
         _sw?.Dispose();
         _fs?.Dispose();
         _queue?.Dispose();            

    }
    public void Log(List<string> list)
    {
        try
        {               
            _queue.TryAdd(list, 100);               

        }
        catch (Exception e)
        {
            LogError(LogLevel.Error, e);
        }
    }
    private void Write()
    {
        foreach (List<string> items in _queue.GetConsumingEnumerable())
        {               
            items.ForEach(item =>
            {                    
                _sw?.WriteLine(item);                    
            });
        }

    }
}

也许我的回答无法解决您的具体问题,但我相信您的场景可能是 memory-mapped files 的一个很好的用例。

Persisted files are memory-mapped files that are associated with a source file on a disk. When the last process has finished working with the file, the data is saved to the source file on the disk. These memory-mapped files are suitable for working with extremely large source files.

这可能非常有趣,因为您将能够从不同的进程(即 IIS 工作进程)进行日志记录,而不会出现锁定问题。参见 MemoryMappedFile.OpenExisting 方法。

此外,您可以记录到非持久性共享内存映射文件,并且使用任务调度程序或 Windows 服务,您可以使用持久性内存映射将未决日志带到它们的最终目的地文件。

由于您的 multi/inter-process 场景,我看到了使用这种方法的巨大潜力。

方法 #2

如果您不想重新发明轮子,我会选择可靠的消息队列,例如 MSMQ(非常基本,但在您的场景中仍然有用)或 RabbitMQ。将日志放入持久队列中,后台进程可能会使用这些日志队列将日志写入文件系统。

这样,您可以每天创建一次、两次或任何时候创建日志文件,并且在系统内记录操作时不会绑定到文件系统。

使用 FileStream.Flush() 方法 - 您可以在每次调用 .Write 后执行此操作。它将清除流的缓冲区并将任何缓冲数据写入文件。

https://msdn.microsoft.com/en-us/library/2bw4h516(v=vs.110).aspx

有几个 "inconsistencies" 有你的问题。

The application is hosted on IIS and uses 18 worker processes

.

_logFilePath = $"D:\Log\log{DateTime.Now.ToString(yyyyMMdd)}{System.Diagnostic.Process.GetCurrentProcess().Id}.txt";

writes serialized objects to a file from multiple methods

将所有这些放在一起,您似乎遇到了单线程情况,而不是多线程情况。由于每个进程都有一个单独的日志,因此不存在争用问题或同步需求。我的意思是,我根本不明白为什么需要 BlockingCollection。您可能忘记提及您的 Web 进程中有多个线程。我将在这里做出这个假设。

另一个问题是您的代码无法编译

  1. class 名称是 LoggerEventLogger 函数看起来像构造函数。
  2. 字符串等一些更不正确的语法

抛开所有这些,如果您确实遇到争用情况并且想通过多个线程或进程写入同一日志,您的 class 似乎拥有您需要的大部分内容。我已经修改了你的 class 来做更多的事情。主要注意以下项目

  1. 修正了所有假设的语法错误
  2. 添加了一个计时器,它会定期调用刷新。这将需要一个 lock 对象以便不中断写操作
  3. StreamWriter 构造函数中使用了明确的缓冲区大小。您应该试探性地确定最适合您的尺寸。此外,您应该从 StreamWriter 中禁用 AutoFlush,这样您就可以让您的写入命中缓冲区而不是文件,从而提供更好的性能。

下面是修改后的代码

public class EventLogger : IDisposable, ILogger {
    private readonly BlockingCollection<List<string>> _queue;
    private readonly Task _consumerTask;
    private FileStream _fs;
    private StreamWriter _sw;
    private System.Timers.Timer _timer;
    private object streamLock = new object();

    private const int MAX_BUFFER = 16 * 1024;      // 16K
    private const int FLUSH_INTERVAL = 10 * 1000;  // 10 seconds

    public  EventLogger() {
        OpenFile();
        _queue = new BlockingCollection<List<string>>(50);
        _consumerTask = Task.Factory.StartNew(Write, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);

    }

    void SetupFlushTimer() {
        _timer = new System.Timers.Timer(FLUSH_INTERVAL);
        _timer.AutoReset = true;
        _timer.Elapsed += TimedFlush;
    }

    void TimedFlush(Object source, System.Timers.ElapsedEventArgs e) {
        _sw?.Flush();
    }

    private void OpenFile() {
        _fs?.Dispose();
        _sw?.Dispose();
        var _logFilePath = $"D:\Log\log{DateTime.Now.ToString("yyyyMMdd")}{System.Diagnostics.Process.GetCurrentProcess().Id}.txt";
        _fs = new FileStream(_logFilePath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
        _sw = new StreamWriter(_fs, Encoding.Default, MAX_BUFFER); // TODO: use the correct encoding here
        _sw.AutoFlush = false;
    }

    public void Dispose() {
        _timer.Elapsed -= TimedFlush;
        _timer.Dispose();

        _queue?.CompleteAdding();
        _consumerTask?.Wait();
        _sw?.Dispose();
        _fs?.Dispose();
        _queue?.Dispose();

    }
    public void Log(List<string> list) {
        try {
            _queue.TryAdd(list, 100);

        } catch (Exception e) {
            LogError(LogLevel.Error, e);
        }
    }

    private void Write() {
        foreach (List<string> items in _queue.GetConsumingEnumerable()) {
            lock (streamLock) {
                items.ForEach(item => {
                    _sw?.WriteLine(item);
                });
            }
        }

    }
}

编辑:
控制该机制性能的因素有 4 个,了解它们之间的关系很重要。希望下面的示例可以清楚地说明

假设

  • List<string> 的平均大小为 50 字节
  • Calls/sec 是 10,000
  • MAX_BUFFER 是 1024 * 1024 字节(1 兆字节)

您每秒产生 500,000 字节的数据,因此 1 兆字节的缓冲区只能容纳 2 秒的数据。即,即使 FLUSH_INTERVAL 设置为 10 秒,缓冲区也会在缓冲区 space.

用完时每 2 秒自动刷新一次(平均)

还要记住,盲目地增加 MAX_BUFFER 是没有用的,因为缓冲区更大,实际刷新操作会花费更长的时间。

要了解的主要事情是,当传入数据速率(对您的 EventLog class)和传出数据速率(对磁盘)存在差异时,您要么需要无限大小的缓冲区(假设连续 运行 过程)或者你将不得不放慢你的平均速度。传入速率以匹配平均。传出率