通过流式传输 json 的某些部分来解析巨大的 OData JSON 以避免 LOH

Parse huge OData JSON by streaming certain sections of the json to avoid LOH

我有一个 OData 响应 JSON 几 MB 并且要求流式传输 "certain parts of JSON" 甚至没有将它们加载到内存中。

例如:当我阅读下面的属性“value[0].Body.Content”时JSON(将以 MB 为单位),我想流式传输此值部分而不将其反序列化为字符串类型的对象。所以基本上将值部分读入固定大小的字节数组并将该字节数组写入目标流(重复该步骤直到数据完成处理)。

JSON:

{
    "@odata.context": "https://localhost:5555/api/v2.0/$metadata#Me/Messages",
    "value": [
        {
            "@odata.id": "https://localhost:5555/api/v2.0/",
            "@odata.etag": "W/\"Something\"",
            "Id": "vccvJHDSFds43hwy98fh",
            "CreatedDateTime": "2018-12-01T01:47:53Z",
            "LastModifiedDateTime": "2018-12-01T01:47:53Z",
            "ChangeKey": "SDgf43tsdf",
            "WebLink": "https://localhost:5555/?ItemID=dfsgsdfg9876ijhrf",
            "Body": {
                "ContentType": "HTML",
                "Content": "<html>\r\n<body>Huge Data Here\r\n</body>\r\n</html>\r\n"
            },
            "ToRecipients": [{
                    "EmailAddress": {
                        "Name": "ME",
                        "Address": "me@me.com"
                    }
                }
            ],
            "CcRecipients": [],
            "BccRecipients": [],
            "ReplyTo": [],
            "Flag": {
                "FlagStatus": "NotFlagged"
            }
        }
    ],
    "@odata.nextLink": "http://localhost:5555/rest/jersey/sleep?%24filter=LastDeliveredDateTime+ge+2018-12-01+and+LastDeliveredDateTime+lt+2018-12-02&%24top=50&%24skip=50"
}

尝试过的方法:
1. 牛顿软件

我最初尝试使用 Newtonsoft 流式处理,但它 internally converts the data into string and loads into memory。 (这导致 LOH 激增并且内存在压缩发生之前不会被释放 - 我们的工作进程有内存限制,无法将其保留在内存中)

**code:**

    using (var jsonTextReader = new JsonTextReader(sr))
    {
        var pool = new CustomArrayPool();
        // Checking if pooling will help with memory
        jsonTextReader.ArrayPool = pool;

        while (jsonTextReader.Read())
        {
            if (jsonTextReader.TokenType == JsonToken.PropertyName
                && ((string)jsonTextReader.Value).Equals("value"))
            {
                jsonTextReader.Read();

                if (jsonTextReader.TokenType == JsonToken.StartArray)
                {
                    while (jsonTextReader.Read())
                    {
                        if (jsonTextReader.TokenType == JsonToken.StartObject)
                        {
                            var Current = JToken.Load(jsonTextReader);
                            // By Now, the LOH Shoots up.
                            // Avoid below code of converting this JToken back to byte array.
                            destinationStream.write(Encoding.ASCII.GetBytes(Current.ToString()));
                        }
                        else if (jsonTextReader.TokenType == JsonToken.EndArray)
                        {
                            break;
                        }
                    }
                }
            }

            if (jsonTextReader.TokenType == JsonToken.StartObject)
            {
                var Current = JToken.Load(jsonTextReader);
                // Do some processing with Current
                destinationStream.write(Encoding.ASCII.GetBytes(Current.ToString()));
            }
        }
    }
  1. OData.Net:

    我在想这是否可以使用 OData.Net 库 looks like it supports streaming of string fields。但是不能走得太远,因为我最终为数据创建了一个模型,这意味着该值将被转换为 MB 的一个字符串对象。

    代码

    ODataMessageReaderSettings settings = new ODataMessageReaderSettings();
    IODataResponseMessage responseMessage = new InMemoryMessage { Stream = stream };
    responseMessage.SetHeader("Content-Type", "application/json;odata.metadata=minimal;");
    // ODataMessageReader reader = new ODataMessageReader((IODataResponseMessage)message, settings, GetEdmModel());
    ODataMessageReader reader = new ODataMessageReader(responseMessage, settings, new EdmModel());
    var oDataResourceReader = reader.CreateODataResourceReader();
    var property = reader.ReadProperty();
    


知道如何使用 OData.Net/Newtonsoft 和某些字段的流值来部分解析此 JSON 吗?
唯一的方法是手动解析流吗?

如果您要将 JSON 的一部分从一个流复制到另一个流,您可以使用 JsonWriter.WriteToken(JsonReader) 更有效地执行此操作,从而避免中间 Current = JToken.Load(jsonTextReader)Encoding.ASCII.GetBytes(Current.ToString()) 表示及其相关的内存开销:

using (var textWriter = new StreamWriter(destinationStream, new UTF8Encoding(false, true), 1024, true))
using (var jsonWriter = new JsonTextWriter(textWriter) { Formatting = Formatting.Indented, CloseOutput = false })
{
    // Use Formatting.Indented or Formatting.None as required.
    jsonWriter.WriteToken(jsonTextReader);
}

然而,Json.NET的JsonTextReader does not have the ability to read a single string value in "chunks" in the same way as XmlReader.ReadValueChunk()。它总是会完全具体化每个原子字符串值。如果您的字符串值太大以至于它们在大对象堆上,即使使用 JsonWriter.WriteToken() 也不会阻止这些字符串被完全加载到内存中。

作为替代方案,您可以考虑 JsonReaderWriterFactory. These readers and writers are used by DataContractJsonSerializer and translate JSON to XML on-the-fly as it is being read and written 返回的读者和作者。由于这些读取器和写入器的基础 类 是 XmlReaderXmlWriter,因此它们 do 支持以块的形式读取和写入字符串值。适当地使用它们将避免在大对象堆中分配字符串。

为此,首先定义以下扩展方法,将 JSON 值的 selected 子集从输入流复制到输出流,如路径所指定到要流式传输的数据:

public static class JsonExtensions
{
    public static void StreamNested(Stream from, Stream to, string [] path)
    {
        var reversed = path.Reverse().ToArray();

        using (var xr = JsonReaderWriterFactory.CreateJsonReader(from, XmlDictionaryReaderQuotas.Max))
        {
            foreach (var subReader in xr.ReadSubtrees(s => s.Select(n => n.LocalName).SequenceEqual(reversed)))
            {
                using (var xw = JsonReaderWriterFactory.CreateJsonWriter(to, Encoding.UTF8, false))
                {
                    subReader.MoveToContent();

                    xw.WriteStartElement("root");
                    xw.WriteAttributes(subReader, true);

                    subReader.Read();

                    while (!subReader.EOF)
                    {
                        if (subReader.NodeType == XmlNodeType.Element && subReader.Depth == 1)
                            xw.WriteNode(subReader, true);
                        else
                            subReader.Read();
                    }

                    xw.WriteEndElement();
                }
            }
        }
    }
}

public static class XmlReaderExtensions
{
    public static IEnumerable<XmlReader> ReadSubtrees(this XmlReader xmlReader, Predicate<Stack<XName>> filter)
    {
        Stack<XName> names = new Stack<XName>();

        while (xmlReader.Read())
        {
            if (xmlReader.NodeType == XmlNodeType.Element)
            {
                names.Push(XName.Get(xmlReader.LocalName, xmlReader.NamespaceURI));
                if (filter(names))
                {
                    using (var subReader = xmlReader.ReadSubtree())
                    {
                        yield return subReader;
                    }
                }
            }

            if ((xmlReader.NodeType == XmlNodeType.Element && xmlReader.IsEmptyElement)
                || xmlReader.NodeType == XmlNodeType.EndElement)
            {
                names.Pop();
            }
        }
    }
}

现在,StreamNested()string [] path 参数是 而不是 任何类型的 path. Instead, it is a path corresponding to the hierarchy of XML elements corresponding to the JSON you want to select as translated by the XmlReader returned by JsonReaderWriterFactory.CreateJsonReader(). The mapping used for this translation is, in turn, documented by Microsoft in Mapping Between JSON and XML。要 select 并仅流式传输匹配 value[*] 的那些 JSON 值,所需的 XML 路径是 //root/value/item。因此,您可以 select 并通过以下方式流式传输所需的嵌套对象:

JsonExtensions.StreamNested(inputStream, destinationStream, new[] { "root", "value", "item" });

备注:

  • Mapping Between JSON and XML有点复杂。使用以下扩展方法将一些示例 JSON 加载到 XDocument 通常更容易:

    static XDocument ParseJsonAsXDocument(string json)
    {
        using (var xr = JsonReaderWriterFactory.CreateJsonReader(new MemoryStream(Encoding.UTF8.GetBytes(json)), Encoding.UTF8, XmlDictionaryReaderQuotas.Max, null))
        {
            return XDocument.Load(xr);
        }
    }
    

    然后通过观察确定正确的XML路径。

  • 有关相关问题,请参阅