C# - 将 JSON 异步解析为对象并添加到队列以进行进一步处理
C# - Parse JSON Asynchronously into Objects and add to queue for further processing
我需要对我在 .tar.gz 中收到的 JSON 文件执行类似 ETL 的操作。我已经能够成功地将文件解压缩并解压到内存流中。
我的问题是,这些文件太大,以至于我在尝试使用各种库将文件解析为对象时遇到内存问题。我已经使用了一些地方的建议来尽量不读取整个文件,但它们仍然涉及将整个文件放入内存中。
https://www.newtonsoft.com/json/help/html/Performance.htm
https://www.newtonsoft.com/json/help/html/SerializingJSONFragments.htm
我想做的是异步读取文件的一小部分,尝试将读取的内容解析为对象,然后添加到适当的队列中以进行进一步处理。我希望我能尽快将这些对象从内存中删除。
例子JSON
{
"header" : {
"id" : 12345,
"datetime" : 1640423287060050040,
"version" : 1.0
},
"Reading" : [
{
"id" : 54321,
"units" : "fps",
"data" : [
{
"value" : 32,
"time" : 1630000000000000400
},
{
"value" : 32,
"time" : 1630000000000000400
},
{
"value" : 32,
"time" : 1630000000000000400
}
]
},
{
"id" : 765432,
"units" : "fps",
"data" : [
{
"value" : 21,
"time" : 1630000000000000400
},
{
"value" : 21,
"time" : 1630000000000000400
},
{
"value" : 21,
"time" : 1630000000000000400
}
]
}
]
}
由于 ReadToEndAsync()
,内存效率低下
using (var stream = _readFile(inFilePath)) //TODO Read async
{
if (stream == null || stream == StreamReader.Null)
{
throw new Exception("streamReader is null");
}
data = JObject.Parse(await stream.ReadToEndAsync());
}
}
我卡在哪里了
var memoryStream = UnTarGz.ExtractTarGzToStream(inFilePath);
memoryStream.Position = 0;
using (var streamReader = new StreamReader(memoryStream))
{
using (JsonReader reader = new JsonTextReader(streamReader))
{
var header = new Header();
while (await reader.ReadAsync(CancellationToken.None))
{
//if somehow I can detect header set parameter above as header data.
//else if its reading data build an object such that
// {
// header = header;
// readings = the reading data
// }
//
// add to queue
}
}
}
Jeroen 在评论中提供了解决方案的概述。在撰写本文时,我的声誉大约是他的 5%,所以我花了一段时间才弄清楚这到底意味着什么 :) 但当我终于让它发挥作用时,我会 post 这里的结果以防万一是你要找的:
public static class Json
{
public static void Run()
{
using (Stream s = File.OpenRead(@"C:\Temp\input.json")) // This is your sample.
using (var sr = new StreamReader(s))
using (var reader = new JsonTextReader(sr))
{
var serializer = new JsonSerializer();
var readingCounter = 0;
// While there are tokens to be read, do it and update reader with token attributes.
while (reader.Read())
{
if (reader.TokenType == JsonToken.StartObject && reader.Path == "Header")
{
var header = serializer.Deserialize<Header>(reader);
Console.WriteLine(header);
}
if (reader.TokenType == JsonToken.StartObject && reader.Path == $"Reading[{readingCounter}]")
{
var currentReading = serializer.Deserialize<Reading>(reader);
readingCounter++;
// Process current reading object here (ex. "add to queue"):
Console.WriteLine(currentReading);
}
}
}
}
}
public class Header
{
public long Id { get; set; }
public long Datetime { get; set; }
public string Version { get; set; }
// For the example only:
public override string ToString()
{
return Id + ", " + Datetime + ", " + Version;
}
}
public class Reading
{
public int Id { get; set; }
public string Units { get; set; }
public List<ValueTime> Data { get; set; }
// For the example only:
public override string ToString()
{
return Id + ", " + Units + ", data count = " + Data.Count;
}
}
public class ValueTime
{
public int Value { get; set; }
public long Time { get; set; }
}
我需要对我在 .tar.gz 中收到的 JSON 文件执行类似 ETL 的操作。我已经能够成功地将文件解压缩并解压到内存流中。
我的问题是,这些文件太大,以至于我在尝试使用各种库将文件解析为对象时遇到内存问题。我已经使用了一些地方的建议来尽量不读取整个文件,但它们仍然涉及将整个文件放入内存中。
https://www.newtonsoft.com/json/help/html/Performance.htm https://www.newtonsoft.com/json/help/html/SerializingJSONFragments.htm
我想做的是异步读取文件的一小部分,尝试将读取的内容解析为对象,然后添加到适当的队列中以进行进一步处理。我希望我能尽快将这些对象从内存中删除。
例子JSON
{
"header" : {
"id" : 12345,
"datetime" : 1640423287060050040,
"version" : 1.0
},
"Reading" : [
{
"id" : 54321,
"units" : "fps",
"data" : [
{
"value" : 32,
"time" : 1630000000000000400
},
{
"value" : 32,
"time" : 1630000000000000400
},
{
"value" : 32,
"time" : 1630000000000000400
}
]
},
{
"id" : 765432,
"units" : "fps",
"data" : [
{
"value" : 21,
"time" : 1630000000000000400
},
{
"value" : 21,
"time" : 1630000000000000400
},
{
"value" : 21,
"time" : 1630000000000000400
}
]
}
]
}
由于 ReadToEndAsync()
,内存效率低下using (var stream = _readFile(inFilePath)) //TODO Read async
{
if (stream == null || stream == StreamReader.Null)
{
throw new Exception("streamReader is null");
}
data = JObject.Parse(await stream.ReadToEndAsync());
}
}
我卡在哪里了
var memoryStream = UnTarGz.ExtractTarGzToStream(inFilePath);
memoryStream.Position = 0;
using (var streamReader = new StreamReader(memoryStream))
{
using (JsonReader reader = new JsonTextReader(streamReader))
{
var header = new Header();
while (await reader.ReadAsync(CancellationToken.None))
{
//if somehow I can detect header set parameter above as header data.
//else if its reading data build an object such that
// {
// header = header;
// readings = the reading data
// }
//
// add to queue
}
}
}
Jeroen 在评论中提供了解决方案的概述。在撰写本文时,我的声誉大约是他的 5%,所以我花了一段时间才弄清楚这到底意味着什么 :) 但当我终于让它发挥作用时,我会 post 这里的结果以防万一是你要找的:
public static class Json
{
public static void Run()
{
using (Stream s = File.OpenRead(@"C:\Temp\input.json")) // This is your sample.
using (var sr = new StreamReader(s))
using (var reader = new JsonTextReader(sr))
{
var serializer = new JsonSerializer();
var readingCounter = 0;
// While there are tokens to be read, do it and update reader with token attributes.
while (reader.Read())
{
if (reader.TokenType == JsonToken.StartObject && reader.Path == "Header")
{
var header = serializer.Deserialize<Header>(reader);
Console.WriteLine(header);
}
if (reader.TokenType == JsonToken.StartObject && reader.Path == $"Reading[{readingCounter}]")
{
var currentReading = serializer.Deserialize<Reading>(reader);
readingCounter++;
// Process current reading object here (ex. "add to queue"):
Console.WriteLine(currentReading);
}
}
}
}
}
public class Header
{
public long Id { get; set; }
public long Datetime { get; set; }
public string Version { get; set; }
// For the example only:
public override string ToString()
{
return Id + ", " + Datetime + ", " + Version;
}
}
public class Reading
{
public int Id { get; set; }
public string Units { get; set; }
public List<ValueTime> Data { get; set; }
// For the example only:
public override string ToString()
{
return Id + ", " + Units + ", data count = " + Data.Count;
}
}
public class ValueTime
{
public int Value { get; set; }
public long Time { get; set; }
}