C# - 将 JSON 异步解析为对象并添加到队列以进行进一步处理

C# - Parse JSON Asynchronously into Objects and add to queue for further processing

我需要对我在 .tar.gz 中收到的 JSON 文件执行类似 ETL 的操作。我已经能够成功地将文件解压缩并解压到内存流中。

我的问题是,这些文件太大,以至于我在尝试使用各种库将文件解析为对象时遇到内存问题。我已经使用了一些地方的建议来尽量不读取整个文件,但它们仍然涉及将整个文件放入内存中。

https://www.newtonsoft.com/json/help/html/Performance.htm https://www.newtonsoft.com/json/help/html/SerializingJSONFragments.htm

我想做的是异步读取文件的一小部分,尝试将读取的内容解析为对象,然后添加到适当的队列中以进行进一步处理。我希望我能尽快将这些对象从内存中删除。

例子JSON

{
  "header" : {
    "id" : 12345,
    "datetime" : 1640423287060050040,
    "version" : 1.0
  },
  "Reading" : [
    {
      "id" : 54321,
      "units" : "fps",
      "data" : [
        {
          "value"  : 32,
          "time" : 1630000000000000400
        },
        {
          "value"  : 32,
          "time" : 1630000000000000400
        },
        {
          "value"  : 32,
          "time" : 1630000000000000400
        }
      ]
    },
    {
      "id" : 765432,
      "units" : "fps",
      "data" : [
        {
          "value"  : 21,
          "time" : 1630000000000000400
        },
        {
          "value"  : 21,
          "time" : 1630000000000000400
        },
        {
          "value"  : 21,
          "time" : 1630000000000000400
        }
      ]
    }
  ]
}

由于 ReadToEndAsync()

,内存效率低下
using (var stream = _readFile(inFilePath)) //TODO Read async 
    {
        if (stream == null || stream == StreamReader.Null)
        {
            throw new Exception("streamReader is null");
        }
        
        data = JObject.Parse(await stream.ReadToEndAsync());
    }
}

我卡在哪里了

var memoryStream = UnTarGz.ExtractTarGzToStream(inFilePath);
memoryStream.Position = 0;
using (var streamReader = new StreamReader(memoryStream))
    {
        using (JsonReader reader = new JsonTextReader(streamReader))
        {
            var header = new Header();
            while (await reader.ReadAsync(CancellationToken.None))
            { 
                //if somehow I can detect header set parameter above as header data. 
                //else if its reading data build an object such that 
                // {
                //     header = header;
                //     readings = the reading data
                // }
                //
                // add to queue
            }
                    
        }
    }

Jeroen 在评论中提供了解决方案的概述。在撰写本文时,我的声誉大约是他的 5%,所以我花了一段时间才弄清楚这到底意味着什么 :) 但当我终于让它发挥作用时,我会 post 这里的结果以防万一是你要找的:

public static class Json
{
    public static void Run()
    {
        using (Stream s = File.OpenRead(@"C:\Temp\input.json")) // This is your sample.
        using (var sr = new StreamReader(s))
        using (var reader = new JsonTextReader(sr))
        {
            var serializer = new JsonSerializer();
            var readingCounter = 0;

            // While there are tokens to be read, do it and update reader with token attributes.
            while (reader.Read())
            {
                if (reader.TokenType == JsonToken.StartObject && reader.Path == "Header")
                {
                    var header = serializer.Deserialize<Header>(reader);
                    Console.WriteLine(header);
                }

                if (reader.TokenType == JsonToken.StartObject && reader.Path == $"Reading[{readingCounter}]")
                {
                    var currentReading = serializer.Deserialize<Reading>(reader);
                    readingCounter++;

                    // Process current reading object here (ex. "add to queue"):
                    Console.WriteLine(currentReading);
                }
            }
        }
    }
}

public class Header
{
    public long Id { get; set; }
    public long Datetime { get; set; }
    public string Version { get; set; }

    // For the example only:
    public override string ToString()
    {
        return Id + ", " + Datetime + ", " + Version;
    }
}

public class Reading
{
    public int Id { get; set; }
    public string Units { get; set; }
    public List<ValueTime> Data { get; set; }

    // For the example only:
    public override string ToString()
    {
        return Id + ", " + Units + ", data count = " + Data.Count;
    }
}

public class ValueTime
{
    public int Value { get; set; }
    public long Time { get; set; }
}