在 ASP.NET WebAPI 2 中流式传输期间在 JSON 流中一个接一个地处理 JSON 个对象
Processing JSON objects one by one in JSON stream during streaming in ASP.NET WebAPI 2
我正在尝试流式传输大型 JSON 文件并在流式传输期间逐项反序列化。
问题是我没有收到错误,但我的代码似乎没有一个一个地处理项目,甚至没有处理 JSON 个对象。
我卡住了,真的不知道如何实现逐个对象处理流的部分。
这是我的代码:
using Newtonsoft.Json;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Web.Http;
using System.Threading.Tasks;
namespace AMServices.Controllers
{
public class FathersData
{
public Father[] fathers { get; set; }
}
public class Someone
{
public string name { get; set; }
}
public class Father : Someone
{
public int id { get; set; }
public bool married { get; set; }
// Lists...
public List<Son> sons { get; set; }
// ... or arrays for collections, that's fine:
public Daughter[] daughters { get; set; }
}
public class Child : Someone
{
public int age { get; set; }
}
public class Son : Child
{
}
public class Daughter : Child
{
public string maidenName { get; set; }
}
public class StreamerController : ApiController
{
static readonly JsonSerializer _serializer = new JsonSerializer();
static readonly HttpClient _client = new HttpClient();
[HttpPost]
[Route("streamer/stream")]
public async Task<IHttpActionResult> stream()
{
string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";
using (var stream = await _client.GetStreamAsync(apiUrl).ConfigureAwait(false))
using (var reader = new StreamReader(stream))
using (var json = new JsonTextReader(reader))
{
if (json == null)
StatusCode(HttpStatusCode.InternalServerError);
JsonSerializer serializer = new JsonSerializer();
Father f = serializer.Deserialize<Father>(json);
System.Diagnostics.Debug.WriteLine(f.name);
}
return StatusCode(HttpStatusCode.OK);
}
}
}
更新
我已经将using (var json = new JsonTextReader(reader))
修改为
while (json.Read())
{
if (json.TokenType == JsonToken.StartObject)
{
JObject objX = JObject.Load(json);
Father f = objX.ToObject<Father>();
System.Diagnostics.Debug.WriteLine("Name -> " + f.name);
}
}
如何更改我可以流式传输以及在此过程中逐项更改?
您可以尝试添加 RootObject
并包含 List<Father>
属性 作为您的反序列化对象,因为您的 URL JSON 数据 "father"
键包含一个数组而不是一个对象。
public class RootObject
{
public List<Father> fathers { get; set; }
}
Api 像这样使用。
public class StreamerController : ApiController
{
static readonly JsonSerializer _serializer = new JsonSerializer();
static readonly HttpClient _client = new HttpClient();
[HttpPost]
[Route("streamer/stream")]
public async Task<IHttpActionResult> stream()
{
string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";
using (var stream = await _client.GetStreamAsync(apiUrl).ConfigureAwait(false))
using (var reader = new StreamReader(stream))
using (var json = new JsonTextReader(reader))
{
if (json == null)
StatusCode(HttpStatusCode.InternalServerError);
JsonSerializer serializer = new JsonSerializer();
RootObject f = serializer.Deserialize<RootObject>(json);
}
return StatusCode(HttpStatusCode.OK);
}
}
由于 async Enumerable 尚不可用,您可以使用回调系统。在下面的示例中,我将反序列化逻辑包装在 class 中,这将公开一个事件 (FatherReaded
).
async Task Main()
{
await stream();
}
// Define other methods and classes here
public class FathersData
{
public Father[] Fathers { get; set; }
}
public class Someone
{
public string Name { get; set; }
}
public class Father : Someone
{
public int Id { get; set; }
public bool Married { get; set; }
// Lists...
public List<Son> Sons { get; set; }
// ... or arrays for collections, that's fine:
public Daughter[] Daughters { get; set; }
}
public class Child : Someone
{
public int age { get; set; }
}
public class Son : Child
{
}
public class Daughter : Child
{
public string maidenName { get; set; }
}
public async Task stream()
{
var fatherReader = new FatherReader();
fatherReader.FatherReaded += (s, f) => {
//f.name.Dump();
System.Diagnostics.Debug.WriteLine(f.Name);
};
string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";
using (var client = new HttpClient())
using (var stream = await client.GetStreamAsync(apiUrl).ConfigureAwait(false))
{
fatherReader.Read(stream);
}
}
public class FatherReader
{
public event System.EventHandler<Father> FatherReaded;
public FatherReader()
{
}
private void OnFatherReaded(Father father){
FatherReaded?.Invoke(this, father);
}
public void Read(Stream stream)
{
using (var reader = new StreamReader(stream))
using (var jsonReader = new JsonTextReader(reader))
{
JsonSerializer serializer = new JsonSerializer();
jsonReader.Read(); // Skip the first StartObject token
while (jsonReader.Read())
{
if (jsonReader.TokenType == JsonToken.StartObject)
{
var father = serializer.Deserialize(jsonReader, typeof(Father));
OnFatherReaded((Father)father);
}
}
}
}
}
相同但具有可观察的 (Rx)。 Dump
方法是 shorthand 到 Console.WriteLine(this.ToString)
的扩展
public async Task stream()
{
var fatherReader = new FatherReader();
var observable = fatherReader.Observable;
// Here you can chain many operator like Linq (filtre, transforme, ...)
observable = observable
.Where(f => f.Name.StartsWith("J"));
observable.Subscribe(f => f.Name.Dump(), e => e.ToString().Dump());
string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";
using (var client = new HttpClient())
using (var stream = await client.GetStreamAsync(apiUrl).ConfigureAwait(false))
{
fatherReader.Read(stream);
}
}
public class FatherReader
{
private Subject<Father> _observable = new Subject<Father>();
public IObservable<Father> Observable => _observable.AsObservable();
public FatherReader()
{
}
private void OnFatherReaded(Father father)
{
_observable.OnNext(father);
}
public void Read(Stream stream)
{
try
{
using (var reader = new StreamReader(stream))
using (var jsonReader = new JsonTextReader(reader))
{
JsonSerializer serializer = new JsonSerializer();
jsonReader.Read(); // Skip the first StartObject token
while (jsonReader.Read())
{
if (jsonReader.TokenType == JsonToken.StartObject)
{
try
{
var father = serializer.Deserialize(jsonReader, typeof(Father));
OnFatherReaded((Father)father);
}
catch (Exception ex)
{
_observable.OnError(ex);
}
}
}
}
}
catch (Exception ex)
{
_observable.OnError(ex);
}
_observable.OnCompleted();
}
}
我正在尝试流式传输大型 JSON 文件并在流式传输期间逐项反序列化。
问题是我没有收到错误,但我的代码似乎没有一个一个地处理项目,甚至没有处理 JSON 个对象。
我卡住了,真的不知道如何实现逐个对象处理流的部分。
这是我的代码:
using Newtonsoft.Json;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Web.Http;
using System.Threading.Tasks;
namespace AMServices.Controllers
{
public class FathersData
{
public Father[] fathers { get; set; }
}
public class Someone
{
public string name { get; set; }
}
public class Father : Someone
{
public int id { get; set; }
public bool married { get; set; }
// Lists...
public List<Son> sons { get; set; }
// ... or arrays for collections, that's fine:
public Daughter[] daughters { get; set; }
}
public class Child : Someone
{
public int age { get; set; }
}
public class Son : Child
{
}
public class Daughter : Child
{
public string maidenName { get; set; }
}
public class StreamerController : ApiController
{
static readonly JsonSerializer _serializer = new JsonSerializer();
static readonly HttpClient _client = new HttpClient();
[HttpPost]
[Route("streamer/stream")]
public async Task<IHttpActionResult> stream()
{
string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";
using (var stream = await _client.GetStreamAsync(apiUrl).ConfigureAwait(false))
using (var reader = new StreamReader(stream))
using (var json = new JsonTextReader(reader))
{
if (json == null)
StatusCode(HttpStatusCode.InternalServerError);
JsonSerializer serializer = new JsonSerializer();
Father f = serializer.Deserialize<Father>(json);
System.Diagnostics.Debug.WriteLine(f.name);
}
return StatusCode(HttpStatusCode.OK);
}
}
}
更新
我已经将using (var json = new JsonTextReader(reader))
修改为
while (json.Read())
{
if (json.TokenType == JsonToken.StartObject)
{
JObject objX = JObject.Load(json);
Father f = objX.ToObject<Father>();
System.Diagnostics.Debug.WriteLine("Name -> " + f.name);
}
}
如何更改我可以流式传输以及在此过程中逐项更改?
您可以尝试添加 RootObject
并包含 List<Father>
属性 作为您的反序列化对象,因为您的 URL JSON 数据 "father"
键包含一个数组而不是一个对象。
public class RootObject
{
public List<Father> fathers { get; set; }
}
Api 像这样使用。
public class StreamerController : ApiController
{
static readonly JsonSerializer _serializer = new JsonSerializer();
static readonly HttpClient _client = new HttpClient();
[HttpPost]
[Route("streamer/stream")]
public async Task<IHttpActionResult> stream()
{
string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";
using (var stream = await _client.GetStreamAsync(apiUrl).ConfigureAwait(false))
using (var reader = new StreamReader(stream))
using (var json = new JsonTextReader(reader))
{
if (json == null)
StatusCode(HttpStatusCode.InternalServerError);
JsonSerializer serializer = new JsonSerializer();
RootObject f = serializer.Deserialize<RootObject>(json);
}
return StatusCode(HttpStatusCode.OK);
}
}
由于 async Enumerable 尚不可用,您可以使用回调系统。在下面的示例中,我将反序列化逻辑包装在 class 中,这将公开一个事件 (FatherReaded
).
async Task Main()
{
await stream();
}
// Define other methods and classes here
public class FathersData
{
public Father[] Fathers { get; set; }
}
public class Someone
{
public string Name { get; set; }
}
public class Father : Someone
{
public int Id { get; set; }
public bool Married { get; set; }
// Lists...
public List<Son> Sons { get; set; }
// ... or arrays for collections, that's fine:
public Daughter[] Daughters { get; set; }
}
public class Child : Someone
{
public int age { get; set; }
}
public class Son : Child
{
}
public class Daughter : Child
{
public string maidenName { get; set; }
}
public async Task stream()
{
var fatherReader = new FatherReader();
fatherReader.FatherReaded += (s, f) => {
//f.name.Dump();
System.Diagnostics.Debug.WriteLine(f.Name);
};
string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";
using (var client = new HttpClient())
using (var stream = await client.GetStreamAsync(apiUrl).ConfigureAwait(false))
{
fatherReader.Read(stream);
}
}
public class FatherReader
{
public event System.EventHandler<Father> FatherReaded;
public FatherReader()
{
}
private void OnFatherReaded(Father father){
FatherReaded?.Invoke(this, father);
}
public void Read(Stream stream)
{
using (var reader = new StreamReader(stream))
using (var jsonReader = new JsonTextReader(reader))
{
JsonSerializer serializer = new JsonSerializer();
jsonReader.Read(); // Skip the first StartObject token
while (jsonReader.Read())
{
if (jsonReader.TokenType == JsonToken.StartObject)
{
var father = serializer.Deserialize(jsonReader, typeof(Father));
OnFatherReaded((Father)father);
}
}
}
}
}
相同但具有可观察的 (Rx)。 Dump
方法是 shorthand 到 Console.WriteLine(this.ToString)
public async Task stream()
{
var fatherReader = new FatherReader();
var observable = fatherReader.Observable;
// Here you can chain many operator like Linq (filtre, transforme, ...)
observable = observable
.Where(f => f.Name.StartsWith("J"));
observable.Subscribe(f => f.Name.Dump(), e => e.ToString().Dump());
string apiUrl = "https://raw.githubusercontent.com/ysharplanguage/FastJsonParser/master/JsonTest/TestData/fathers.json.txt";
using (var client = new HttpClient())
using (var stream = await client.GetStreamAsync(apiUrl).ConfigureAwait(false))
{
fatherReader.Read(stream);
}
}
public class FatherReader
{
private Subject<Father> _observable = new Subject<Father>();
public IObservable<Father> Observable => _observable.AsObservable();
public FatherReader()
{
}
private void OnFatherReaded(Father father)
{
_observable.OnNext(father);
}
public void Read(Stream stream)
{
try
{
using (var reader = new StreamReader(stream))
using (var jsonReader = new JsonTextReader(reader))
{
JsonSerializer serializer = new JsonSerializer();
jsonReader.Read(); // Skip the first StartObject token
while (jsonReader.Read())
{
if (jsonReader.TokenType == JsonToken.StartObject)
{
try
{
var father = serializer.Deserialize(jsonReader, typeof(Father));
OnFatherReaded((Father)father);
}
catch (Exception ex)
{
_observable.OnError(ex);
}
}
}
}
}
catch (Exception ex)
{
_observable.OnError(ex);
}
_observable.OnCompleted();
}
}