C# - OutOfMemoryException 在 JSON 文件上保存列表

C# - OutOfMemoryException saving a List on a JSON file

我正在尝试保存压力图的流数据。 基本上我有一个压力矩阵定义为:

double[,] pressureMatrix = new double[e.Data.GetLength(0), e.Data.GetLength(1)];

基本上,我每 10 毫秒得到一个 pressureMatrix,我想将所有信息保存在一个 JSON 文件中,以便以后能够重现。

我所做的是,首先,写下我所谓的 header 以及用于录制的所有设置,如下所示:

recordedData.softwareVersion = Assembly.GetExecutingAssembly().GetName().Version.Major.ToString() + "." + Assembly.GetExecutingAssembly().GetName().Version.Minor.ToString();
recordedData.calibrationConfiguration = calibrationConfiguration;
recordedData.representationConfiguration = representationSettings;
recordedData.pressureData = new List<PressureMap>();

var json = JsonConvert.SerializeObject(csvRecordedData, Formatting.None);

File.WriteAllText(this.filePath, json);

然后,每次我得到一个新的压力图时,我都会创建一个新线程来添加新的PressureMatrix和re-write文件:

var newPressureMatrix = new PressureMap(datos, DateTime.Now);
recordedData.pressureData.Add(newPressureMatrix);
var json = JsonConvert.SerializeObject(recordedData, Formatting.None);
File.WriteAllText(this.filePath, json);

大约 20-30 分钟后,我得到一个 OutOfMemory 异常,因为系统无法容纳 recordedData 变量,因为其中的 List<PressureMatrix> 太大。

我该如何处理以保存数据?我要保存24-48小时的信息

您的基本问题是您将所有压力图样本保存在内存中,而不是单独编写每个样本,然后让其被垃圾收集。更糟糕的是,你在两个不同的地方这样做:

  1. 在将字符串写入文件之前,您将整个样本列表序列化为 JSON 字符串 json

    相反,如 Performance Tips: Optimize Memory Usage, you should serialize and deserialize directly to and from your file in such situations. For instructions on how to do this see this answer to Can Json.NET serialize / deserialize to / from a stream? and also Serialize JSON to a file 中所述。

  2. recordedData.pressureData = new List<PressureMap>();累加所有压力图样本,然后每做一个样本全部写入

    一个更好的解决方案是每个样本都写一次然后忘记它,但是每个样本都需要嵌套在 JSON 中的某个容器 object 中,这使得如何做并不明显那.

那么,如何解决问题#2?

首先,让我们按如下方式修改您的数据模型,将 header 数据划分为单独的 class:

public class PressureMap
{
    public double[,] PressureMatrix { get; set; }
}

public class CalibrationConfiguration 
{
    // Data model not included in question
}

public class RepresentationConfiguration 
{
    // Data model not included in question
}

public class RecordedDataHeader
{
    public string SoftwareVersion { get; set; }
    public CalibrationConfiguration CalibrationConfiguration { get; set; }
    public RepresentationConfiguration RepresentationConfiguration { get; set; }
}

public class RecordedData
{
    // Ensure the header is serialized first.
    [JsonProperty(Order = 1)]
    public RecordedDataHeader RecordedDataHeader { get; set; }
    // Ensure the pressure data is serialized last.
    [JsonProperty(Order = 2)]
    public IEnumerable<PressureMap> PressureData { get; set; }
}

选项 #1producer-comsumer pattern. It involves spinning up two threads: one to generate PressureData samples, and one to serialize the RecordedData. The first thread will generate samples and add them to a BlockingCollection<PressureMap> collection that is passed to the second thread. The second thread will then serialize BlockingCollection<PressureMap>.GetConsumingEnumerable() 的一个版本 作为 RecordedData.PressureData.

的值

以下代码给出了如何执行此操作的框架:

var sampleCount = 400;    // Or whatever stopping criterion you prefer
var sampleInterval = 10;  // in ms

using (var pressureData = new BlockingCollection<PressureMap>())
{
    // Adapted from
    // https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/blockingcollection-overview
    // https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1?view=netframework-4.7.2

    // Spin up a Task to sample the pressure maps
    using (Task t1 = Task.Factory.StartNew(() =>
    {
        for (int i = 0; i < sampleCount; i++)
        {
            var data = GetPressureMap(i);
            Console.WriteLine("Generated sample {0}", i);
            pressureData.Add(data);
            System.Threading.Thread.Sleep(sampleInterval);
        }
        pressureData.CompleteAdding();
    }))
    {
        // Spin up a Task to consume the BlockingCollection
        using (Task t2 = Task.Factory.StartNew(() =>
        {
            var recordedDataHeader = new RecordedDataHeader
            {
                SoftwareVersion = softwareVersion,
                CalibrationConfiguration = calibrationConfiguration,
                RepresentationConfiguration = representationConfiguration,
            };

            var settings = new JsonSerializerSettings
            {
                ContractResolver = new CamelCasePropertyNamesContractResolver(),
            };

            using (var stream = new FileStream(this.filePath, FileMode.Create))
            using (var textWriter = new StreamWriter(stream))
            using (var jsonWriter = new JsonTextWriter(textWriter))
            {
                int j = 0;

                var query = pressureData
                    .GetConsumingEnumerable()
                    .Select(p => 
                            { 
                                // Flush the writer periodically in case the process terminates abnormally
                                jsonWriter.Flush();
                                Console.WriteLine("Serializing item {0}", j++);
                                return p;
                            });

                var recordedData = new RecordedData
                {
                    RecordedDataHeader = recordedDataHeader,
                    // Since PressureData is declared as IEnumerable<PressureMap>, evaluation will be lazy.
                    PressureData = query,
                };                          

                Console.WriteLine("Beginning serialization of {0} to {1}:", recordedData, this.filePath);
                JsonSerializer.CreateDefault(settings).Serialize(textWriter, recordedData);
                Console.WriteLine("Finished serialization of {0} to {1}.", recordedData, this.filePath);
            }
        }))
        {
            Task.WaitAll(t1, t2);
        }
    }
}

备注:

  • 这个解决方案使用了这样一个事实,即在序列化一个 IEnumerable<T> 时,Json.NET 将 not 将可枚举对象具体化为一个列表。相反,它将充分利用惰性评估并简单地枚举它,写入然后忘记遇到的每个单独项目。

  • 第一个线程采样 PressureData 并将它们添加到阻塞 collection.

  • 第二个线程将阻塞 collection 包装在 IEnumerable<PressureData> 中,然后将其序列化为 RecordedData.PressureData.

    在序列化期间,序列化程序将枚举 IEnumerable<PressureData> 个可枚举对象,将每个流式传输到 JSON 文件,然后继续下一个 - 有效阻塞直到一个可用。

  • 您将需要做一些实验以确保序列化线程可以 "keep up" 与采样线程,可能通过在构造期间设置 BoundedCapacity。如果没有,您可能需要采取不同的策略。

  • PressureMap GetPressureMap(int count) 应该是您的某种方法(问题中未显示)returns 当前压力图样本。

  • 在这种技术中,JSON 文件在采样期间保持打开状态 session。如果采样异常终止,文件可能会被截断。我尝试通过定期刷新写入器来改善问题。

  • 虽然数据序列化将不再需要无限量的内存,但稍后反序列化 RecordedData 会将 PressureData 数组反序列化为具体的 List<PressureMap>。这可能会导致下游处理过程中出现内存问题。

演示 fiddle #1 here.

选项 #2 将从 JSON 文件切换到 Newline Delimited JSON 文件。这样的文件由 JSON object 序列组成,由换行符分隔。在您的情况下,您将使第一个 object 包含 RecordedDataHeader 信息,随后的 object 类型为 PressureMap:

var sampleCount = 100; // Or whatever
var sampleInterval = 10;

var recordedDataHeader = new RecordedDataHeader
{
    SoftwareVersion = softwareVersion,
    CalibrationConfiguration = calibrationConfiguration,
    RepresentationConfiguration = representationConfiguration,
};

var settings = new JsonSerializerSettings
{
    ContractResolver = new CamelCasePropertyNamesContractResolver(),
};

// Write the header
Console.WriteLine("Beginning serialization of sample data to {0}.", this.filePath);

using (var stream = new FileStream(this.filePath, FileMode.Create))
{
    JsonExtensions.ToNewlineDelimitedJson(stream, new[] { recordedDataHeader });
}

// Write each sample incrementally

for (int i = 0; i < sampleCount; i++)
{
    Thread.Sleep(sampleInterval);
    Console.WriteLine("Performing sample {0} of {1}", i, sampleCount);
    var map = GetPressureMap(i);

    using (var stream = new FileStream(this.filePath, FileMode.Append))
    {
        JsonExtensions.ToNewlineDelimitedJson(stream, new[] { map });
    }
}

Console.WriteLine("Finished serialization of sample data to {0}.", this.filePath);

使用扩展方法:

public static partial class JsonExtensions
{
    // Adapted from the answer to
    // https://whosebug.com/questions/44787652/serialize-as-ndjson-using-json-net
    // by dbc https://whosebug.com/users/3744182/dbc
    public static void ToNewlineDelimitedJson<T>(Stream stream, IEnumerable<T> items)
    {
        // Let caller dispose the underlying stream 
        using (var textWriter = new StreamWriter(stream, new UTF8Encoding(false, true), 1024, true))
        {
            ToNewlineDelimitedJson(textWriter, items);
        }
    }

    public static void ToNewlineDelimitedJson<T>(TextWriter textWriter, IEnumerable<T> items)
    {
        var serializer = JsonSerializer.CreateDefault();

        foreach (var item in items)
        {
            // Formatting.None is the default; I set it here for clarity.
            using (var writer = new JsonTextWriter(textWriter) { Formatting = Formatting.None, CloseOutput = false })
            {
                serializer.Serialize(writer, item);
            }
            // http://specs.okfnlabs.org/ndjson/
            // Each JSON text MUST conform to the [RFC7159] standard and MUST be written to the stream followed by the newline character \n (0x0A). 
            // The newline charater MAY be preceeded by a carriage return \r (0x0D). The JSON texts MUST NOT contain newlines or carriage returns.
            textWriter.Write("\n");
        }
    }

    // Adapted from the answer to 
    // https://whosebug.com/questions/29729063/line-delimited-json-serializing-and-de-serializing
    // by Yuval Itzchakov https://whosebug.com/users/1870803/yuval-itzchakov
    public static IEnumerable<TBase> FromNewlineDelimitedJson<TBase, THeader, TRow>(TextReader reader)
        where THeader : TBase
        where TRow : TBase
    {
        bool first = true;

        using (var jsonReader = new JsonTextReader(reader) { CloseInput = false, SupportMultipleContent = true })
        {
            var serializer = JsonSerializer.CreateDefault();

            while (jsonReader.Read())
            {
                if (jsonReader.TokenType == JsonToken.Comment)
                    continue;
                if (first)
                {
                    yield return serializer.Deserialize<THeader>(jsonReader);
                    first = false;
                }
                else
                {
                    yield return serializer.Deserialize<TRow>(jsonReader);
                }
            }
        }
    }
}

稍后,您可以按如下方式处理换行符分隔的 JSON 文件:

using (var stream = File.OpenRead(filePath))
using (var textReader = new StreamReader(stream))
{
    foreach (var obj in JsonExtensions.FromNewlineDelimitedJson<object, RecordedDataHeader, PressureMap>(textReader))
    {
        if (obj is RecordedDataHeader)
        {
            var header = (RecordedDataHeader)obj;
            // Process the header
            Console.WriteLine(JsonConvert.SerializeObject(header));
        }
        else
        {
            var row = (PressureMap)obj;
            // Process the row.
            Console.WriteLine(JsonConvert.SerializeObject(row));
        }
    }
}

备注:

  • 这种方法看起来更简单,因为样本是逐渐添加到文件末尾的,而不是插入到某个整体 JSON 容器中。

  • 通过这种方法,序列化和下游处理都可以通过有限的内存使用来完成。

  • 示例文件在采样期间不会保持打开状态,因此不太可能被截断。

  • 下游应用程序可能没有 built-in 处理换行分隔的工具 JSON。

  • 此策略可以更简单地与您当前的线程代码集成。

演示 fiddle #2 here.