使用 System.Text.Json 异步反序列化列表
Asynchonously deserializing a list using System.Text.Json
假设我请求一个大型 json 文件,其中包含许多对象的列表。我不希望它们一下子全部在内存中,而是我宁愿一个一个地阅读和处理它们。所以我需要将异步 System.IO.Stream
流转换为 IAsyncEnumerable<T>
。我如何使用新的 System.Text.Json
API 来做到这一点?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
是的,一个真正的流式 JSON(反)序列化器将是一个很好的性能改进,在很多地方。
不幸的是,System.Text.Json
在我写这篇文章的时候并没有这样做。我不确定将来是否会 - 我希望如此! JSON 的真正流式反序列化结果相当具有挑战性。
你可以看看极速Utf8Json是否支持,也许。
但是,对于您的具体情况,可能会有自定义解决方案,因为您的要求似乎限制了难度。
想法是一次从数组中手动读取一项。我们正在利用列表中的每个项目本身都是有效的 JSON 对象这一事实。
您可以手动跳过 [
(第一个项目)或 ,
(每个下一个项目)。那么我认为你最好的选择是使用 .NET Core 的 Utf8JsonReader
来确定当前对象的结束位置,并将扫描的字节提供给 JsonDeserializer
.
这样,您一次只能在一个对象上稍微缓冲。
由于我们在谈论性能,您可以在使用时从 PipeReader
获得输入。 :-)
也许你可以使用 Newtonsoft.Json
序列化程序?
https://www.newtonsoft.com/json/help/html/Performance.htm
特别看部分:
Optimize Memory Usage
编辑
您可以尝试反序列化来自 JsonTextReader 的值,例如
using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
while (await reader.ReadAsync(cancellationToken))
{
yield return reader.Value;
}
}
感觉你需要实现自己的流reader。您必须逐个读取字节并在对象定义完成后立即停止。它确实很低级。因此,您不会将整个文件加载到 RAM 中,而是选择您正在处理的部分。好像是答案?
TL;DR 不简单
看起来有人 已经 for a Utf8JsonStreamReader
struct that reads buffers from a stream and feeds them to a Utf8JsonRreader, allowing easy deserialization with JsonSerializer.Deserialize<T>(ref newJsonReader, options);
. The code isn't trivial either. The related question is and the answer is .
但这还不够 - HttpClient.GetAsync
仅在收到整个响应后才会 return,本质上是在内存中缓冲所有内容。
为避免这种情况,HttpClient.GetAsync(string,HttpCompletionOption ) 应与 HttpCompletionOption.ResponseHeadersRead
一起使用。
反序列化循环也应该检查取消标记,如果收到信号则退出或抛出。否则循环将继续,直到接收并处理整个流。
此代码基于相关答案的示例并使用 HttpCompletionOption.ResponseHeadersRead
并检查取消令牌。它可以解析包含适当项目数组的 JSON 字符串,例如:
[{"prop1":123},{"prop1":234}]
第一次调用 jsonStreamReader.Read()
移动到数组的开头,而第二次调用移动到第一个对象的开头。当检测到数组末尾 (]
) 时,循环本身终止。
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
//Don't cache the entire response
using var httpResponse = await httpClient.GetAsync(url,
HttpCompletionOption.ResponseHeadersRead,
cancellationToken);
using var stream = await httpResponse.Content.ReadAsStreamAsync();
using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);
jsonStreamReader.Read(); // move to array start
jsonStreamReader.Read(); // move to start of the object
while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
{
//Gracefully return if cancellation is requested.
//Could be cancellationToken.ThrowIfCancellationRequested()
if(cancellationToken.IsCancellationRequested)
{
return;
}
// deserialize object
var obj = jsonStreamReader.Deserialize<T>();
yield return obj;
// JsonSerializer.Deserialize ends on last token of the object parsed,
// move to the first token of next object
jsonStreamReader.Read();
}
}
JSON 片段,又名流 JSON 又名 ...*
在事件流或日志记录场景中,将单个 JSON 对象附加到文件中是很常见的,每行一个元素,例如:
{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}
这不是有效的 JSON 文档 但各个片段都是有效的。对于大型 data/highly 并发场景,这有几个优点。添加新事件只需要在文件中追加一个新行,而不需要解析和重建整个文件。 处理,尤其是并行处理更容易,原因有二:
- 可以一次检索一个单独的元素,只需从流中读取一行即可。
- 输入文件可以很容易地跨行边界进行分区和拆分,将每个部分提供给一个单独的工作进程,例如在 Hadoop 集群中,或者只是应用程序中的不同线程:计算拆分点,例如通过除以长度按工人数量,然后寻找第一个换行符。将所有的东西都交给一个单独的工人。
使用 StreamReader
执行此操作的分配方式是使用 TextReader,一次读取一行并使用 JsonSerializer.Deserialize 解析它:
using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken
while((line=await reader.ReadLineAsync()) != null)
{
var item=JsonSerializer.Deserialize<T>(line);
yield return item;
if(cancellationToken.IsCancellationRequested)
{
return;
}
}
这比反序列化适当数组的代码简单得多。有两个问题:
ReadLineAsync
不接受取消令牌
- 每次迭代都会分配一个新字符串,这是我们希望通过使用 System.Text.Json
避免 的事情之一
虽然这可能就足够了,因为尝试生成 JsonSerializer.Deserialize 所需的 ReadOnlySpan<Byte>
缓冲区并非易事。
管道和 SequenceReader
为避免分配位置,我们需要从流中获取 ReadOnlySpan<byte>
。这样做需要使用 System.IO.Pipeline 管道,SequenceReader struct. Steve Gordon's An Introduction to SequenceReader 解释了如何使用此 class 来使用定界符从流中读取数据。
不幸的是,SequenceReader
是一个引用结构,这意味着它不能在异步或本地方法中使用。这就是为什么 Steve Gordon 在他的文章中创建了一个
private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
从 ReadOnlySequence 和 return 结束位置读取项目的方法,因此 PipeReader 可以从中恢复。 不幸的是 我们想要 return 一个 IEnumerable 或 IAsyncEnumerable,并且迭代器方法也不喜欢 in
或 out
参数。
我们可以将反序列化的项目收集到列表或队列中,并将它们 return 作为单个结果,但这仍然会分配列表、缓冲区或节点,并且必须等待缓冲区中的所有项目都完成在 returning 之前反序列化:
private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
我们需要一些东西,它像一个可枚举的东西一样不需要迭代器方法,可以与异步一起工作并且不会以任何方式缓冲所有内容。
添加通道以生成 IAsyncEnumerable
ChannelReader.ReadAllAsync returns 一个 IAsyncEnumerable。我们可以 return 从不能作为迭代器工作的方法中得到一个 ChannelReader,并且仍然在没有缓存的情况下生成元素流。
调整 Steve Gordon 的代码以使用频道,我们得到 ReadItems(ChannelWriter...) 和 ReadLastItem
方法。第一个,一次读取一个项目,使用 ReadOnlySpan<byte> itemBytes
最多换行。这可以被 JsonSerializer.Deserialize
使用。如果 ReadItems
找不到分隔符,它会 return 确定它的位置,这样 PipelineReader 就可以从流中提取下一个块。
当我们到达最后一个块并且没有其他分隔符时,ReadLastItem` 读取剩余的字节并反序列化它们。
该代码与史蒂夫戈登的代码几乎相同。我们不写入控制台,而是写入 ChannelWriter。
private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;
private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence,
bool isCompleted, CancellationToken token)
{
var reader = new SequenceReader<byte>(sequence);
while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
{
if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
{
var item=JsonSerializer.Deserialize<T>(itemBytes);
writer.TryWrite(item);
}
else if (isCompleted) // read last item which has no final delimiter
{
var item = ReadLastItem<T>(sequence.Slice(reader.Position));
writer.TryWrite(item);
reader.Advance(sequence.Length); // advance reader to the end
}
else // no more items in this sequence
{
break;
}
}
return reader.Position;
}
private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
var length = (int)sequence.Length;
if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
{
Span<byte> byteBuffer = stackalloc byte[length];
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
else // otherwise we'll rent an array to use as the buffer
{
var byteBuffer = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
}
}
}
DeserializeToChannel<T>
方法在流的顶部创建一个管道 reader,创建一个通道并启动一个工作任务来解析块并将它们推送到通道:
ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
var pipeReader = PipeReader.Create(stream);
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
while (!token.IsCancellationRequested)
{
var result = await pipeReader.ReadAsync(token); // read from the pipe
var buffer = result.Buffer;
var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer
if (result.IsCompleted)
break; // exit if we've read everything from the pipe
pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
}
pipeReader.Complete();
},token)
.ContinueWith(t=>{
pipeReader.Complete();
writer.TryComplete(t.Exception);
});
return channel.Reader;
}
ChannelReader.ReceiveAllAsync()
可用于通过 IAsyncEnumerable<T>
:
消耗所有物品
var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
//Do something with it
}
可以在 .NET 5 (C# 9) 中使用 System.IO.Pipelines
扩展包和 System.Text.Json.JsonSerializer
,而不是通过 ChannelReader
使用多个任务,如下所示:
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
class Program
{
static readonly byte[] NewLineChars = {(byte)'\r', (byte)'\n'};
static readonly byte[] WhiteSpaceChars = {(byte)'\r', (byte)'\n', (byte)' ', (byte)'\t'};
private static async Task Main()
{
JsonSerializerOptions jsonOptions = new(JsonSerializerDefaults.Web);
var json = "{\"some\":\"thing1\"}\r\n{\"some\":\"thing2\"}\r\n{\"some\":\"thing3\"}";
var contentStream = new MemoryStream(Encoding.UTF8.GetBytes(json));
var pipeReader = PipeReader.Create(contentStream);
await foreach (var foo in ReadItemsAsync<Foo>(pipeReader, jsonOptions))
{
Console.WriteLine($"foo: {foo.Some}");
}
}
static async IAsyncEnumerable<TValue> ReadItemsAsync<TValue>(PipeReader pipeReader, JsonSerializerOptions jsonOptions = null)
{
while (true)
{
var result = await pipeReader.ReadAsync();
var buffer = result.Buffer;
bool isCompleted = result.IsCompleted;
SequencePosition bufferPosition = buffer.Start;
while (true)
{
var(value, advanceSequence) = TryReadNextItem<TValue>(buffer, ref bufferPosition, isCompleted, jsonOptions);
if (value != null)
{
yield return value;
}
if (advanceSequence)
{
pipeReader.AdvanceTo(bufferPosition, buffer.End); //advance our position in the pipe
break;
}
}
if (isCompleted)
yield break;
}
}
static (TValue, bool) TryReadNextItem<TValue>(ReadOnlySequence<byte> sequence, ref SequencePosition sequencePosition, bool isCompleted, JsonSerializerOptions jsonOptions)
{
var reader = new SequenceReader<byte>(sequence.Slice(sequencePosition));
while (!reader.End) // loop until we've come to the end or read an item
{
if (reader.TryReadToAny(out ReadOnlySpan<byte> itemBytes, NewLineChars, advancePastDelimiter: true))
{
sequencePosition = reader.Position;
if (itemBytes.TrimStart(WhiteSpaceChars).IsEmpty)
{
continue;
}
return (JsonSerializer.Deserialize<TValue>(itemBytes, jsonOptions), false);
}
else if (isCompleted)
{
// read last item
var remainingReader = sequence.Slice(reader.Position);
using var memoryOwner = MemoryPool<byte>.Shared.Rent((int)reader.Remaining);
remainingReader.CopyTo(memoryOwner.Memory.Span);
reader.Advance(remainingReader.Length); // advance reader to the end
sequencePosition = reader.Position;
if (!itemBytes.TrimStart(WhiteSpaceChars).IsEmpty)
{
return (JsonSerializer.Deserialize<TValue>(memoryOwner.Memory.Span, jsonOptions), true);
}
else
{
return (default, true);
}
}
else
{
// no more items in sequence
break;
}
}
// PipeReader needs to read more
return (default, true);
}
}
public class Foo
{
public string Some
{
get;
set;
}
}
我知道这是一个旧的 post,但是最近在 .Net 6 Preview 4 中发布的 System.Text.Json support for IAsyncEnumerable
提供了 OP 中提到的问题的解决方案。
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
await foreach(var item in JsonSerializer.DeserializeAsyncEnumerable<T>(stream))
{
yield return item;
}
}
}
}
这将提供按需反序列化,在处理大数据时非常有用。请注意,目前该功能仅限于根级别 JSON 数组。
可以找到有关该功能的更多详细信息here
假设我请求一个大型 json 文件,其中包含许多对象的列表。我不希望它们一下子全部在内存中,而是我宁愿一个一个地阅读和处理它们。所以我需要将异步 System.IO.Stream
流转换为 IAsyncEnumerable<T>
。我如何使用新的 System.Text.Json
API 来做到这一点?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
是的,一个真正的流式 JSON(反)序列化器将是一个很好的性能改进,在很多地方。
不幸的是,System.Text.Json
在我写这篇文章的时候并没有这样做。我不确定将来是否会 - 我希望如此! JSON 的真正流式反序列化结果相当具有挑战性。
你可以看看极速Utf8Json是否支持,也许。
但是,对于您的具体情况,可能会有自定义解决方案,因为您的要求似乎限制了难度。
想法是一次从数组中手动读取一项。我们正在利用列表中的每个项目本身都是有效的 JSON 对象这一事实。
您可以手动跳过 [
(第一个项目)或 ,
(每个下一个项目)。那么我认为你最好的选择是使用 .NET Core 的 Utf8JsonReader
来确定当前对象的结束位置,并将扫描的字节提供给 JsonDeserializer
.
这样,您一次只能在一个对象上稍微缓冲。
由于我们在谈论性能,您可以在使用时从 PipeReader
获得输入。 :-)
也许你可以使用 Newtonsoft.Json
序列化程序?
https://www.newtonsoft.com/json/help/html/Performance.htm
特别看部分:
Optimize Memory Usage
编辑
您可以尝试反序列化来自 JsonTextReader 的值,例如
using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
while (await reader.ReadAsync(cancellationToken))
{
yield return reader.Value;
}
}
感觉你需要实现自己的流reader。您必须逐个读取字节并在对象定义完成后立即停止。它确实很低级。因此,您不会将整个文件加载到 RAM 中,而是选择您正在处理的部分。好像是答案?
TL;DR 不简单
看起来有人 已经 Utf8JsonStreamReader
struct that reads buffers from a stream and feeds them to a Utf8JsonRreader, allowing easy deserialization with JsonSerializer.Deserialize<T>(ref newJsonReader, options);
. The code isn't trivial either. The related question is
但这还不够 - HttpClient.GetAsync
仅在收到整个响应后才会 return,本质上是在内存中缓冲所有内容。
为避免这种情况,HttpClient.GetAsync(string,HttpCompletionOption ) 应与 HttpCompletionOption.ResponseHeadersRead
一起使用。
反序列化循环也应该检查取消标记,如果收到信号则退出或抛出。否则循环将继续,直到接收并处理整个流。
此代码基于相关答案的示例并使用 HttpCompletionOption.ResponseHeadersRead
并检查取消令牌。它可以解析包含适当项目数组的 JSON 字符串,例如:
[{"prop1":123},{"prop1":234}]
第一次调用 jsonStreamReader.Read()
移动到数组的开头,而第二次调用移动到第一个对象的开头。当检测到数组末尾 (]
) 时,循环本身终止。
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
//Don't cache the entire response
using var httpResponse = await httpClient.GetAsync(url,
HttpCompletionOption.ResponseHeadersRead,
cancellationToken);
using var stream = await httpResponse.Content.ReadAsStreamAsync();
using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);
jsonStreamReader.Read(); // move to array start
jsonStreamReader.Read(); // move to start of the object
while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
{
//Gracefully return if cancellation is requested.
//Could be cancellationToken.ThrowIfCancellationRequested()
if(cancellationToken.IsCancellationRequested)
{
return;
}
// deserialize object
var obj = jsonStreamReader.Deserialize<T>();
yield return obj;
// JsonSerializer.Deserialize ends on last token of the object parsed,
// move to the first token of next object
jsonStreamReader.Read();
}
}
JSON 片段,又名流 JSON 又名 ...*
在事件流或日志记录场景中,将单个 JSON 对象附加到文件中是很常见的,每行一个元素,例如:
{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}
这不是有效的 JSON 文档 但各个片段都是有效的。对于大型 data/highly 并发场景,这有几个优点。添加新事件只需要在文件中追加一个新行,而不需要解析和重建整个文件。 处理,尤其是并行处理更容易,原因有二:
- 可以一次检索一个单独的元素,只需从流中读取一行即可。
- 输入文件可以很容易地跨行边界进行分区和拆分,将每个部分提供给一个单独的工作进程,例如在 Hadoop 集群中,或者只是应用程序中的不同线程:计算拆分点,例如通过除以长度按工人数量,然后寻找第一个换行符。将所有的东西都交给一个单独的工人。
使用 StreamReader
执行此操作的分配方式是使用 TextReader,一次读取一行并使用 JsonSerializer.Deserialize 解析它:
using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken
while((line=await reader.ReadLineAsync()) != null)
{
var item=JsonSerializer.Deserialize<T>(line);
yield return item;
if(cancellationToken.IsCancellationRequested)
{
return;
}
}
这比反序列化适当数组的代码简单得多。有两个问题:
ReadLineAsync
不接受取消令牌- 每次迭代都会分配一个新字符串,这是我们希望通过使用 System.Text.Json 避免 的事情之一
虽然这可能就足够了,因为尝试生成 JsonSerializer.Deserialize 所需的 ReadOnlySpan<Byte>
缓冲区并非易事。
管道和 SequenceReader
为避免分配位置,我们需要从流中获取 ReadOnlySpan<byte>
。这样做需要使用 System.IO.Pipeline 管道,SequenceReader struct. Steve Gordon's An Introduction to SequenceReader 解释了如何使用此 class 来使用定界符从流中读取数据。
不幸的是,SequenceReader
是一个引用结构,这意味着它不能在异步或本地方法中使用。这就是为什么 Steve Gordon 在他的文章中创建了一个
private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
从 ReadOnlySequence 和 return 结束位置读取项目的方法,因此 PipeReader 可以从中恢复。 不幸的是 我们想要 return 一个 IEnumerable 或 IAsyncEnumerable,并且迭代器方法也不喜欢 in
或 out
参数。
我们可以将反序列化的项目收集到列表或队列中,并将它们 return 作为单个结果,但这仍然会分配列表、缓冲区或节点,并且必须等待缓冲区中的所有项目都完成在 returning 之前反序列化:
private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
我们需要一些东西,它像一个可枚举的东西一样不需要迭代器方法,可以与异步一起工作并且不会以任何方式缓冲所有内容。
添加通道以生成 IAsyncEnumerable
ChannelReader.ReadAllAsync returns 一个 IAsyncEnumerable。我们可以 return 从不能作为迭代器工作的方法中得到一个 ChannelReader,并且仍然在没有缓存的情况下生成元素流。
调整 Steve Gordon 的代码以使用频道,我们得到 ReadItems(ChannelWriter...) 和 ReadLastItem
方法。第一个,一次读取一个项目,使用 ReadOnlySpan<byte> itemBytes
最多换行。这可以被 JsonSerializer.Deserialize
使用。如果 ReadItems
找不到分隔符,它会 return 确定它的位置,这样 PipelineReader 就可以从流中提取下一个块。
当我们到达最后一个块并且没有其他分隔符时,ReadLastItem` 读取剩余的字节并反序列化它们。
该代码与史蒂夫戈登的代码几乎相同。我们不写入控制台,而是写入 ChannelWriter。
private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;
private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence,
bool isCompleted, CancellationToken token)
{
var reader = new SequenceReader<byte>(sequence);
while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
{
if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
{
var item=JsonSerializer.Deserialize<T>(itemBytes);
writer.TryWrite(item);
}
else if (isCompleted) // read last item which has no final delimiter
{
var item = ReadLastItem<T>(sequence.Slice(reader.Position));
writer.TryWrite(item);
reader.Advance(sequence.Length); // advance reader to the end
}
else // no more items in this sequence
{
break;
}
}
return reader.Position;
}
private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
var length = (int)sequence.Length;
if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
{
Span<byte> byteBuffer = stackalloc byte[length];
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
else // otherwise we'll rent an array to use as the buffer
{
var byteBuffer = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
}
}
}
DeserializeToChannel<T>
方法在流的顶部创建一个管道 reader,创建一个通道并启动一个工作任务来解析块并将它们推送到通道:
ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
var pipeReader = PipeReader.Create(stream);
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
while (!token.IsCancellationRequested)
{
var result = await pipeReader.ReadAsync(token); // read from the pipe
var buffer = result.Buffer;
var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer
if (result.IsCompleted)
break; // exit if we've read everything from the pipe
pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
}
pipeReader.Complete();
},token)
.ContinueWith(t=>{
pipeReader.Complete();
writer.TryComplete(t.Exception);
});
return channel.Reader;
}
ChannelReader.ReceiveAllAsync()
可用于通过 IAsyncEnumerable<T>
:
var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
//Do something with it
}
可以在 .NET 5 (C# 9) 中使用 System.IO.Pipelines
扩展包和 System.Text.Json.JsonSerializer
,而不是通过 ChannelReader
使用多个任务,如下所示:
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
class Program
{
static readonly byte[] NewLineChars = {(byte)'\r', (byte)'\n'};
static readonly byte[] WhiteSpaceChars = {(byte)'\r', (byte)'\n', (byte)' ', (byte)'\t'};
private static async Task Main()
{
JsonSerializerOptions jsonOptions = new(JsonSerializerDefaults.Web);
var json = "{\"some\":\"thing1\"}\r\n{\"some\":\"thing2\"}\r\n{\"some\":\"thing3\"}";
var contentStream = new MemoryStream(Encoding.UTF8.GetBytes(json));
var pipeReader = PipeReader.Create(contentStream);
await foreach (var foo in ReadItemsAsync<Foo>(pipeReader, jsonOptions))
{
Console.WriteLine($"foo: {foo.Some}");
}
}
static async IAsyncEnumerable<TValue> ReadItemsAsync<TValue>(PipeReader pipeReader, JsonSerializerOptions jsonOptions = null)
{
while (true)
{
var result = await pipeReader.ReadAsync();
var buffer = result.Buffer;
bool isCompleted = result.IsCompleted;
SequencePosition bufferPosition = buffer.Start;
while (true)
{
var(value, advanceSequence) = TryReadNextItem<TValue>(buffer, ref bufferPosition, isCompleted, jsonOptions);
if (value != null)
{
yield return value;
}
if (advanceSequence)
{
pipeReader.AdvanceTo(bufferPosition, buffer.End); //advance our position in the pipe
break;
}
}
if (isCompleted)
yield break;
}
}
static (TValue, bool) TryReadNextItem<TValue>(ReadOnlySequence<byte> sequence, ref SequencePosition sequencePosition, bool isCompleted, JsonSerializerOptions jsonOptions)
{
var reader = new SequenceReader<byte>(sequence.Slice(sequencePosition));
while (!reader.End) // loop until we've come to the end or read an item
{
if (reader.TryReadToAny(out ReadOnlySpan<byte> itemBytes, NewLineChars, advancePastDelimiter: true))
{
sequencePosition = reader.Position;
if (itemBytes.TrimStart(WhiteSpaceChars).IsEmpty)
{
continue;
}
return (JsonSerializer.Deserialize<TValue>(itemBytes, jsonOptions), false);
}
else if (isCompleted)
{
// read last item
var remainingReader = sequence.Slice(reader.Position);
using var memoryOwner = MemoryPool<byte>.Shared.Rent((int)reader.Remaining);
remainingReader.CopyTo(memoryOwner.Memory.Span);
reader.Advance(remainingReader.Length); // advance reader to the end
sequencePosition = reader.Position;
if (!itemBytes.TrimStart(WhiteSpaceChars).IsEmpty)
{
return (JsonSerializer.Deserialize<TValue>(memoryOwner.Memory.Span, jsonOptions), true);
}
else
{
return (default, true);
}
}
else
{
// no more items in sequence
break;
}
}
// PipeReader needs to read more
return (default, true);
}
}
public class Foo
{
public string Some
{
get;
set;
}
}
我知道这是一个旧的 post,但是最近在 .Net 6 Preview 4 中发布的 System.Text.Json support for IAsyncEnumerable
提供了 OP 中提到的问题的解决方案。
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
await foreach(var item in JsonSerializer.DeserializeAsyncEnumerable<T>(stream))
{
yield return item;
}
}
}
}
这将提供按需反序列化,在处理大数据时非常有用。请注意,目前该功能仅限于根级别 JSON 数组。
可以找到有关该功能的更多详细信息here