如何根据深度嵌套的数组 属性 分割一个大的 JSON 文件?

How to split a large JSON file based on an array property which is deeply nested?

我有一个很大的 json 文件(大约 16Gb),其结构如下:

{
  "Job": {
    "Keys": {
      "JobID": "test123",
      "DeviceID": "TEST01"
    },
    "Props": {
      "FileType": "Measurements",
      "InstrumentDescriptions": [
        {
          "InstrumentID": "1723007",
          "InstrumentType": "Actual1",
          "Name": "U",
          "DataType": "Double",
          "Units": "degC"
        },
        {
          "InstrumentID": "2424009",
          "InstrumentType": "Actual2",
          "Name": "VG03",
          "DataType": "Double",
          "Units": "Pa"
        }
      ]
    },
    "Steps": [
      {
        "Keys": {
          "StepID": "START",
          "StepResult": "NormalEnd"
        },
        "InstrumentData": [
          {
            "Keys": {
              "InstrumentID": "1723007"
            },
            "Measurements": [
              {
                "DateTime": "2021-11-16 21:18:37.000",
                "Value": 540
              },
              {
                "DateTime": "2021-11-16 21:18:37.100",
                "Value": 539
              },
              {
                "DateTime": "2021-11-16 21:18:37.200",
                "Value": 540
              },
              {
                "DateTime": "2021-11-16 21:18:37.300",
                "Value": 540
              },
              {
                "DateTime": "2021-11-16 21:18:37.400",
                "Value": 540
              },
              {
                "DateTime": "2021-11-16 21:18:37.500",
                "Value": 540
              },
              {
                "DateTime": "2021-11-16 21:18:37.600",
                "Value": 540
              },
              {
                "DateTime": "2021-11-16 21:18:37.700",
                "Value": 538
              },
              {
                "DateTime": "2021-11-16 21:18:37.800",
                "Value": 540
              }
            ]
          },
          {
            "Keys": {
              "InstrumentID": "2424009"
            },
            "Measurements": [
              {
                "DateTime": "2021-11-16 21:18:37.000",
                "Value": 1333.22
              },
              {
                "DateTime": "2021-11-16 21:18:37.100",
                "Value": 1333.22
              },
              {
                "DateTime": "2021-11-16 21:18:37.200",
                "Value": 1333.22
              },
              {
                "DateTime": "2021-11-16 21:18:37.300",
                "Value": 1333.22
              },
              {
                "DateTime": "2021-11-16 21:18:37.400",
                "Value": 1333.22
              },
              {
                "DateTime": "2021-11-16 21:18:37.500",
                "Value": 1333.22
              },
              {
                "DateTime": "2021-11-16 21:18:37.600",
                "Value": 1333.22
              },
              {
                "DateTime": "2021-11-16 21:18:37.700",
                "Value": 1333.22
              },
              {
                "DateTime": "2021-11-16 21:18:37.800",
                "Value": 1333.22
              }
            ]
          }
        ]
      }
    ]
  }
}

问题

我想通过拆分数组“InstrumentData”将此文件拆分为多个文件,因为该数组将保存数据的主要块。将此文件拆分为更小的文件将使我能够解析文件而不会出现内存不足异常。

当前状态

public static void SplitJson(string filename, string arrayPropertyName)
    {
        string templateFileName = @"C:\Temp\template.json";
        string arrayFileName = @"C:\Temp\array.json";

        CreateEmptyFile(templateFileName);
        CreateEmptyFile(arrayFileName);

        using (Stream stream = File.OpenRead(filename))
        using (JsonReader reader = new JsonTextReader(new StreamReader(stream)))
        using (JsonWriter templateWriter = new JsonTextWriter(new StreamWriter(templateFileName)))
        using (JsonWriter arrayWriter = new JsonTextWriter(new StreamWriter(arrayFileName)))
        {
            if (reader.Read() && reader.TokenType == JsonToken.StartObject)
            {
                templateWriter.WriteStartObject();
                while (reader.Read() && reader.TokenType != JsonToken.EndObject)
                {
                    string propertyName = (string)reader.Value;
                    reader.Read();
                    templateWriter.WritePropertyName(propertyName);
                    if (propertyName == arrayPropertyName)
                    {
                        arrayWriter.WriteToken(reader);
                        templateWriter.WriteStartObject();  // empty placeholder object
                        templateWriter.WriteEndObject();
                    }
                    else if (reader.TokenType == JsonToken.StartObject ||
                             reader.TokenType == JsonToken.StartArray)
                    {
                        templateWriter.WriteToken(reader);
                    }
                    else
                    {
                        templateWriter.WriteValue(reader.Value);
                    }
                }
                templateWriter.WriteEndObject();
            }
        }

        // Now read the huge array file and combine each item in the array
        // with the template to make new files
        JObject template = JObject.Parse(File.ReadAllText(templateFileName));
        using (JsonReader arrayReader = new JsonTextReader(new StreamReader(arrayFileName)))
        {
            int counter = 0;
            while (arrayReader.Read())
            {
                if (arrayReader.TokenType == JsonToken.StartObject)
                {
                    counter++;
                    JObject item = JObject.Load(arrayReader);
                    template[arrayPropertyName] = item;
                    string fileName = string.Format(@"C:\Temp\output_{0}_{1}_{2}.json",
                                                    template["name"], template["age"], counter);

                    File.WriteAllText(fileName, template.ToString());
                }
            }
        }

        // Clean up temporary files
        File.Delete(templateFileName);
        File.Delete(arrayFileName);
    }

我正在使用这种方法尝试将文件拆分成更小的文件。但是,这种方法只能根据根级别的属性拆分文件。

问题

我是否在正确的轨道上解决这个问题?这是解决这个问题的有效方法吗?如何通过高效拆分数组将 JSON 拆分为多个文件? JSON 文件应该以一种方式拆分,即“InstrumentData”数组中的每个元素都有一个文件。所有其他属性和结构应保留在拆分文件中。

这是一个有点丑陋的解决方案,绝对不是最好的解决方案,但它允许将具有几 GB 大小的类似结构的 json 拆分为包含一行中各个数组成员的较小文件-逐行方式。它保留了原始的冗余缩进和结束逗号,但如有必要,可以额外修复这些。

using var inputStream = File.OpenText("./input.json");

// Searching for the beginning of the array by the "InstrumentData" key
string? line;
while ((line = inputStream.ReadLine()) != null)
{
  if (line.Contains("\"InstrumentData\""))
    break;
}

// End of file reached, exiting
if (line == null)
  return;

// Reading and splitting the "InstrumentData" array.
StreamWriter? outputFile = null;
var outputFilesCounter = 0;
var arrayLevel = 0;
var objectLevel = 0;
try
{
  while ((line = inputStream.ReadLine()) != null)
  {
    // Track the levels of nesting within the array
    arrayLevel += line.Count(c => c == '[');
    objectLevel += line.Count(c => c == '{');

    // Write a line into the currently opened output file stream
    if (objectLevel > 0)
    {
      outputFile ??= File.CreateText($"./output_{outputFilesCounter++}.json");
      outputFile.WriteLine(line);
    }

    arrayLevel -= line.Count(c => c == ']');
    objectLevel -= line.Count(c => c == '}');

    // End of an array member, flush the file stream
    if (objectLevel == 0)
    {
      outputFile?.Dispose();
      outputFile = null;
    }

    // End of array reached, exiting
    if (arrayLevel < 0)
    {
      outputFile?.Dispose();
      return;
    }
  }
}
finally
{
  outputFile?.Dispose();
}

从您的问题中不清楚您通过拆分数组“InstrumentData” 来拆分 JSON 是什么意思。有问题的数组位于路径 "Job.Steps[*].InstrumentData[*]",因此您是否也有效地拆分包含数组 "Job.Steps[*]"?那么 "Job.Keys" 之类的前缀和后缀属性呢——您想用它们做什么?

定义和实现拆分的一种方法是采用 中的方法,该方法适用于嵌套数组。在那个问题中,前缀和后缀属性保留在每个拆分文件中,而要拆分的数组将分成块。在那个问题中,数组 属性 位于根级别,但在您的情况下,您需要指定数组 属性 的路径。如果要拆分的数组深深嵌套在其他一些数组值中,则这些值也需要拆分。

假设这是您想要的,下面的扩展方法应该可以解决问题:

public static partial class JsonExtensions
{
    public static string [] SplitJsonFile(string fileName, string [] splitPath, Func<string, string, int, string, string> nameCreator)
    {
        List<string> fileNames = new List<string>();
        
        var name = Path.GetFileNameWithoutExtension(fileName);
        var ext = Path.GetExtension(fileName);
        var directory = Path.GetDirectoryName(fileName);
        Func<int, TextWriter> createStream = (i) => 
        {
            // Use whatever method you like to generate a name for each fragment.
            var newName = nameCreator(directory, name, i, ext);
            var writer = new StreamWriter(newName, false, Encoding.UTF8);
            fileNames.Add(newName);
            return writer;
        };

        using (var reader = new StreamReader(fileName, Encoding.UTF8))
        {
            JsonExtensions.SplitJson(reader,splitPath, 1, createStream, Formatting.Indented);
        }

        return fileNames.ToArray();
    }
    
    public static void SplitJson(TextReader textReader, IList<string> splitPath, long maxItems, Func<int, TextWriter> createStream, Formatting formatting)
    {
        if (splitPath == null || createStream == null || textReader == null)
            throw new ArgumentNullException();
        if (splitPath.Count < 1 || maxItems < 1)
            throw new ArgumentException();
        using (var reader = new JsonTextReader(textReader))
        {
            List<JsonWriter> writers = new ();
            List<ParentToken> parentTokens = new ();
            try
            {
                SplitJson(reader, splitPath, 0, maxItems, createStream, formatting, parentTokens, writers);
            }
            finally
            {
                // Make sure files are closed in the event of an exception.
                foreach (IDisposable writer in writers)
                    writer?.Dispose();
            }
        }
    }
    
    struct ParentToken
    {
        public ParentToken(JsonToken tokenType, IList<JToken> prefixTokens = default) => (this.TokenType, this._prefixTokens) = (tokenType, prefixTokens);
        readonly IList<JToken> _prefixTokens;
        public JsonToken TokenType { get; }
        public IList<JToken> PrefixTokens => _prefixTokens ?? Array.Empty<JToken>();
    }
    
    static JsonWriter AddWriter(List<JsonWriter> writers, List<ParentToken> parentTokens, Func<int, TextWriter> createStream, Formatting formatting)
    {
        var writer = new JsonTextWriter(createStream(writers.Count)) { Formatting = formatting, AutoCompleteOnClose = false };
        writers.Add(writer);
        foreach (var parent in parentTokens)
        {
            switch (parent.TokenType)
            {
                case JsonToken.StartObject:
                    writer.WriteStartObject();
                    break;
                case JsonToken.StartArray:
                    writer.WriteStartArray();
                    break;
                default:
                    throw new JsonException();
            }
            for (int i = 0; i < parent.PrefixTokens.Count; i++)
            {
                if (i == parent.PrefixTokens.Count - 1 && parent.PrefixTokens[i] is JProperty property && property.Value.Type == JTokenType.Undefined)
                    writer.WritePropertyName(property.Name);
                else
                    parent.PrefixTokens[i].WriteTo(writer);
            }
        }
        return writer;
    }
    
    static (JsonWriter, int) GetCurrentWriter(List<JsonWriter> writers, List<ParentToken> parentTokens, Func<int, TextWriter> createStream, Formatting formatting)
        => writers.Count == 0 ? (AddWriter(writers, parentTokens, createStream, formatting), 0) : (writers[writers.Count-1], writers.Count-1);
    
    static void SplitJson(JsonTextReader reader, IList<string> splitPath, int index, long maxItems, Func<int, TextWriter> createStream, Formatting formatting, List<ParentToken> parentTokens , List<JsonWriter> writers)
    {
        var startTokenType = reader.MoveToContentAndAssert().TokenType;
        var startReaderDepth = reader.Depth;

        var bottom = index >= splitPath.Count;
        
        switch (startTokenType)
        {
            case JsonToken.StartObject:
                {
                    (var firstWriter, var firstWriterIndex) = GetCurrentWriter(writers, parentTokens, createStream, formatting);
                    bool prefix = true;
                    bool doRead = true;
                    firstWriter.WriteStartObject();
                    var parentToken = new ParentToken(JsonToken.StartObject, new List<JToken>());
                    while ((doRead ? reader.ReadToContentAndAssert() : reader.MoveToContentAndAssert()).TokenType != JsonToken.EndObject)
                    {
                        doRead = true;
                        var propertyName = (string)reader.AssertTokenType(JsonToken.PropertyName).Value;
                        if (propertyName == splitPath[index])
                        {
                            if (!prefix)
                                throw new JsonException(string.Format("Duplicated property name {0}", propertyName));
                            prefix = false;
                            
                            // Advance reader to value.
                            reader.ReadToContentAndAssert();
                            
                            // Add a token with the current property name but an undefined value.  This indicates an unclosed property.
                            firstWriter.WritePropertyName(propertyName);
                            parentToken.PrefixTokens.Add(new JProperty(propertyName, JValue.CreateUndefined()));
                            
                            parentTokens.Add(parentToken);
                            
                            // SplitJson() leaves the reader positioned ON the end of the token that was read, rather than after.
                            SplitJson(reader, splitPath, index + 1, maxItems, createStream, formatting, parentTokens, writers);
                            parentTokens.RemoveAt(parentTokens.Count-1);
                        }
                        else if (prefix)
                        {
                            // JProperty.Load() leaves the reader positioned AFTER the token that was read, rather than at the end.
                            var property = JProperty.Load(reader);
                            property.WriteTo(firstWriter);
                            parentToken.PrefixTokens.Add(property);
                            doRead = false;
                        }
                        else
                        {
                            var property = JProperty.Load(reader);
                            for (int i = firstWriterIndex; i < writers.Count; i++)
                            {
                                property.WriteTo(writers[i]);
                            }
                            doRead = false;
                        }
                    }
                    for (int i = firstWriterIndex; i < writers.Count; i++)
                    {
                        if (prefix)
                            // We never found the property
                            foreach (var property in parentToken.PrefixTokens)
                                property.WriteTo(writers[i]);
                        writers[i].WriteEndObject();
                    }
                }
                break;
            case JsonToken.StartArray: // Split the array.
                {
                    var maxItemsAtDepth = bottom ? maxItems : 1L;
                    (var writer, var firstWriterIndex) = GetCurrentWriter(writers, parentTokens, createStream, formatting);
                    writer.WriteStartArray();
                    long count = 0L;
                    while (reader.ReadToContentAndAssert().TokenType != JsonToken.EndArray)
                    {
                        if (reader.TokenType == JsonToken.Comment || reader.TokenType == JsonToken.None)
                            continue;
                        if (count >= maxItemsAtDepth)
                        {
                            writer = AddWriter(writers, parentTokens, createStream, formatting);
                            writer.WriteStartArray();
                            count = 0L;
                        }
                        if (bottom)
                            // WriteToken() leaves the reader positioned ON the end of the token that was read, rather than after.
                            writer.WriteToken(reader);
                        else
                        {
                            parentTokens.Add(new ParentToken(JsonToken.StartArray));
                            // SplitJson() leaves the reader positioned ON the end of the token that was read, rather than after.
                            SplitJson(reader, splitPath, index, maxItems, createStream, formatting, parentTokens, writers);
                            parentTokens.RemoveAt(parentTokens.Count-1);
                        }
                        count++;
                    }
                    for (int i = firstWriterIndex; i < writers.Count; i++)
                    {
                        writers[i].WriteEndArray();
                    }
                }
                break;
            default: // null, for instance
                {
                    (var writer, var _) = GetCurrentWriter(writers, parentTokens, createStream, formatting);
                    writer.WriteToken(reader);
                }
                break;
        }
    }       
}

public static partial class JsonExtensions
{
    public static JsonReader AssertTokenType(this JsonReader reader, JsonToken tokenType) => 
        reader.TokenType == tokenType ? reader : throw new JsonSerializationException(string.Format("Unexpected token {0}, expected {1}", reader.TokenType, tokenType));
    
    public static JsonReader ReadToContentAndAssert(this JsonReader reader) =>
        reader.ReadAndAssert().MoveToContentAndAssert();

    public static JsonReader MoveToContentAndAssert(this JsonReader reader)
    {
        if (reader == null)
            throw new ArgumentNullException();
        if (reader.TokenType == JsonToken.None)       // Skip past beginning of stream.
            reader.ReadAndAssert();
        while (reader.TokenType == JsonToken.Comment) // Skip past comments.
            reader.ReadAndAssert();
        return reader;
    }

    public static JsonReader ReadAndAssert(this JsonReader reader)
    {
        if (reader == null)
            throw new ArgumentNullException();
        if (!reader.Read())
            throw new JsonReaderException("Unexpected end of JSON stream.");
        return reader;
    }
}

然后,在 "Job.Steps[*].InstrumentData[*]" 上拆分,如下所示:

var fileNames = JsonExtensions.SplitJsonFile(fileName, 
                                             new [] { "Job", "Steps", "InstrumentData" }, 
                                             (directory, name, i, ext) => Path.Combine(directory, Path.ChangeExtension(name + $"_fragment_{i}", ext)));

或者,要在 "Job.Steps[*].InstrumentData[*].Measurements[*]" 上拆分,请按如下方式调用:

var fileNames = JsonExtensions.SplitJsonFile(fileName, 
                                             new [] { "Job", "Steps", "InstrumentData", "Measurements", 
                                             (directory, name, i, ext) => Path.Combine(directory, Path.ChangeExtension(name + $"_fragment_{i}", ext)));

演示 fiddle here.

我还使用 中的 JSON 测试了 JsonExtensions.SplitJson() 的增强版,以验证没有回归;这没有。请参阅 fiddle #2 此处

这是另一种方法,您可以使用 Cinchoo ETL 拆分大型 Json 文件 - 一个开源库

假设输入 json 文件带有 Job.Steps[*].InstrumentData[*] 个节点,这需要 2 级解析才能将文件拆分为 InstrumentData

首先,按每个 Steps[*] 节点(也称为 StepsFiles)分解输入文件,然后获取每个 StepsFiles 并按每个 InstrumentData[*] 节点分解它们。

由于代码的复杂性,起草了示例 fiddle 以供审核。

示例 fiddle: https://dotnetfiddle.net/j3Y03m

我这里的解决方案: https://dotnetfiddle.net/CufM4w

想法是使用相对简单的状态自动机分两轮处理文档。

  1. 提取根模板和每个步骤模板(前缀和后缀),但完全跳过 InstrumentData。

  2. 相反,跳过除 InstrumentData 之外的所有内容,并使用 #1 中的部分构建输出。

所以主要代码看起来像(查看上面的 link 以获取完整源代码):

    var reader = new StringReader(JsonText);
    var splitter = DataSplitter.Create(reader);

    reader = new StringReader(JsonText);
    splitter.Split(reader, (step, index, content) => {
        Console.WriteLine("=== step: {0}, index: {1} ===", step, index);
        Console.WriteLine(content);
     });

希望对您有所帮助。