在 ASP.NET WebAPI 2 中流式传输期间在 JSON 流中一个接一个地处理 JSON 个对象

Processing JSON objects one by one in JSON stream during streaming in ASP.NET WebAPI 2

我正在尝试流式传输大型 JSON 文件并在流式传输期间逐项反序列化。

我正在使用此测试 https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt

问题是我没有收到错误,但我的代码似乎没有一个一个地处理项目,甚至没有处理 JSON 个对象。

我卡住了,真的不知道如何实现逐个对象处理流的部分。

这是我的代码:

using Newtonsoft.Json;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Web.Http;
using System.Threading.Tasks;

namespace AMServices.Controllers
{
    public class FathersData
    {
        public Father[] fathers { get; set; }
    }

    public class Someone
    {
        public string name { get; set; }
    }

    public class Father : Someone
    {
        public int id { get; set; }
        public bool married { get; set; }
        // Lists...
        public List<Son> sons { get; set; }
        // ... or arrays for collections, that's fine:
        public Daughter[] daughters { get; set; }
    }

    public class Child : Someone
    {
        public int age { get; set; }
    }

    public class Son : Child
    {
    }

    public class Daughter : Child
    {
        public string maidenName { get; set; }
    }

    public class StreamerController : ApiController
    {
        static readonly JsonSerializer _serializer = new JsonSerializer();
        static readonly HttpClient _client = new HttpClient();

        [HttpPost]
        [Route("streamer/stream")]
        public async Task<IHttpActionResult> stream()
        {
            string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";

            using (var stream = await _client.GetStreamAsync(apiUrl).ConfigureAwait(false))
            using (var reader = new StreamReader(stream))
            using (var json = new JsonTextReader(reader))
            {
                if (json == null)
                    StatusCode(HttpStatusCode.InternalServerError);

                JsonSerializer serializer = new JsonSerializer();

                Father f = serializer.Deserialize<Father>(json);
                System.Diagnostics.Debug.WriteLine(f.name);    
            }

            return StatusCode(HttpStatusCode.OK);
        }
    }
}

更新

我已经将using (var json = new JsonTextReader(reader))修改为

while (json.Read())
{
    if (json.TokenType == JsonToken.StartObject)
    {
        JObject objX = JObject.Load(json);
        Father f = objX.ToObject<Father>();

        System.Diagnostics.Debug.WriteLine("Name -> " + f.name);
    }
}

如何更改我可以流式传输以及在此过程中逐项更改?

您可以尝试添加 RootObject 并包含 List<Father> 属性 作为您的反序列化对象,因为您的 URL JSON 数据 "father" 键包含一个数组而不是一个对象。

public class RootObject
{
    public List<Father> fathers { get; set; }
}

Api 像这样使用。

public class StreamerController : ApiController
{
    static readonly JsonSerializer _serializer = new JsonSerializer();
    static readonly HttpClient _client = new HttpClient();

    [HttpPost]
    [Route("streamer/stream")]
    public async Task<IHttpActionResult> stream()
    {
        string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";

        using (var stream = await _client.GetStreamAsync(apiUrl).ConfigureAwait(false))
        using (var reader = new StreamReader(stream))
        using (var json = new JsonTextReader(reader))
        {
            if (json == null)
                StatusCode(HttpStatusCode.InternalServerError);

            JsonSerializer serializer = new JsonSerializer();

            RootObject f = serializer.Deserialize<RootObject>(json);  
        }

        return StatusCode(HttpStatusCode.OK);
    }
}

由于 async Enumerable 尚不可用,您可以使用回调系统。在下面的示例中,我将反序列化逻辑包装在 class 中,这将公开一个事件 (FatherReaded).

async Task Main()
{
    await stream();
}

// Define other methods and classes here
public class FathersData
{
    public Father[] Fathers { get; set; }
}

public class Someone
{
    public string Name { get; set; }
}

public class Father : Someone
{
    public int Id { get; set; }
    public bool Married { get; set; }
    // Lists...
    public List<Son> Sons { get; set; }
    // ... or arrays for collections, that's fine:
    public Daughter[] Daughters { get; set; }
}

public class Child : Someone
{
    public int age { get; set; }
}

public class Son : Child
{
}

public class Daughter : Child
{
    public string maidenName { get; set; }
}

public async Task stream()
{

    var fatherReader = new FatherReader();
    fatherReader.FatherReaded += (s, f) => {
        //f.name.Dump();
        System.Diagnostics.Debug.WriteLine(f.Name);
    };

    string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";

    using (var client = new HttpClient())
    using (var stream = await client.GetStreamAsync(apiUrl).ConfigureAwait(false))
    {
        fatherReader.Read(stream);
    }
}

public class FatherReader
{
    public event System.EventHandler<Father> FatherReaded;

    public FatherReader()
    {

    }

    private void OnFatherReaded(Father father){
        FatherReaded?.Invoke(this, father);
    }

    public void Read(Stream stream)
    {
        using (var reader = new StreamReader(stream))
        using (var jsonReader = new JsonTextReader(reader))
        {
            JsonSerializer serializer = new JsonSerializer();

            jsonReader.Read(); // Skip the first StartObject token

            while (jsonReader.Read())
            {
                if (jsonReader.TokenType == JsonToken.StartObject)
                {
                    var father = serializer.Deserialize(jsonReader, typeof(Father));
                    OnFatherReaded((Father)father);
                }
            }
        }
    }
}

相同但具有可观察的 (Rx)。 Dump 方法是 shorthand 到 Console.WriteLine(this.ToString)

的扩展
public async Task stream()
{
    var fatherReader = new FatherReader();
    var observable = fatherReader.Observable;

    // Here you can chain many operator like Linq (filtre, transforme, ...)
    observable = observable
        .Where(f => f.Name.StartsWith("J"));

    observable.Subscribe(f => f.Name.Dump(), e => e.ToString().Dump());

    string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";

    using (var client = new HttpClient())
    using (var stream = await client.GetStreamAsync(apiUrl).ConfigureAwait(false))
    {
        fatherReader.Read(stream);
    }
}

public class FatherReader
{
    private Subject<Father> _observable = new Subject<Father>();
    public IObservable<Father> Observable => _observable.AsObservable();

    public FatherReader()
    {
    }

    private void OnFatherReaded(Father father)
    {
        _observable.OnNext(father);
    }

    public void Read(Stream stream)
    {
        try
        {
            using (var reader = new StreamReader(stream))
            using (var jsonReader = new JsonTextReader(reader))
            {
                JsonSerializer serializer = new JsonSerializer();

                jsonReader.Read(); // Skip the first StartObject token

                while (jsonReader.Read())
                {
                    if (jsonReader.TokenType == JsonToken.StartObject)
                    {
                        try
                        {
                            var father = serializer.Deserialize(jsonReader, typeof(Father));
                            OnFatherReaded((Father)father);
                        }
                        catch (Exception ex)
                        {
                            _observable.OnError(ex);
                        }
                    }
                }
            }
        }
        catch (Exception ex)
        {
            _observable.OnError(ex);
        }

        _observable.OnCompleted();
    }
}