使用 IAsyncEnumerable 读取文本文件

Read text file with IAsyncEnumerable

我在测试 C# 8.0 功能时遇到了 IAsyncEnumerable。我从 Anthony Chu (https://anthonychu.ca/post/async-streams-dotnet-core-3-iasyncenumerable/) 那里找到了很好的例子。它是异步流和 Task<IEnumerable<T>>

的替代品
// Data Access Layer.
public async IAsyncEnumerable<Product> GetAllProducts()
{
    Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
    var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
    while (iterator.HasMoreResults)
    {
        foreach (var product in await iterator.ReadNextAsync())
        {
            yield return product;
        }
    }
}

// Usage
await foreach (var product in productsRepository.GetAllProducts())
{
    Console.WriteLine(product);
}

我想知道这是否可以应用于读取文本文件,如下所示逐行读取文件的用法。

foreach (var line in File.ReadLines("Filename"))
{
    // ...process line.
}

我真的很想知道如何将异步 IAsyncEnumerable<string>() 应用于上面的 foreach 循环,以便它在读取时流式传输。

如何实现迭代器以便我可以使用 yield return 逐行读取?

完全一样,但是没有异步工作负载,所以让我们假装

public async IAsyncEnumerable<string> SomeSortOfAwesomeness()
{
   foreach (var line in File.ReadLines("Filename.txt"))
   {
       // simulates an async workload, 
       // otherwise why would be using IAsyncEnumerable?
       // -- added due to popular demand 
       await Task.Delay(100);
       yield return line;
   }
}

这只是一个包装的 APM 工作负载,请参阅 Stephen Clearys 评论以了解说明

public static async IAsyncEnumerable<string> SomeSortOfAwesomeness()
{
   using StreamReader reader = File.OpenText("Filename.txt");
   while(!reader.EndOfStream)
      yield return await reader.ReadLineAsync();
}

用法

await foreach(var line in SomeSortOfAwesomeness())
{
   Console.WriteLine(line);
}

Stephen Cleary

更新

File.OpenText sadly only allows synchronous I/O; the async APIs are implemented poorly in that scenario. To open a true asynchronous file, you'd need to use a FileStream constructor passing isAsync: true or FileOptions.Asynchronous.

ReadLineAsync 基本上是这段代码的结果,如您所见,它只是 Stream APM BeginEnd 方法 wrapped

private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
{            
     return TaskFactory<Int32>.FromAsyncTrim(
                    this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
                    (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
                    (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
}

我做了一些性能测试,看起来大 bufferSizeFileOptions.SequentialScan 选项一起使用似乎很有帮助。

public static async IAsyncEnumerable<string> ReadLinesAsync(string filePath)
{
    using var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read,
        FileShare.Read, 32768, FileOptions.Asynchronous | FileOptions.SequentialScan);
    using var reader = new StreamReader(stream);
    while (true)
    {
        var line = await reader.ReadLineAsync().ConfigureAwait(false);
        if (line == null) break;
        yield return line;
    }
}

虽然枚举并不是真正的异步。根据我的实验,StreamReader class 的 xxxAsync 方法阻塞当前线程的持续时间比 Task 它们 return 的等待时间长。例如,使用 ReadToEndAsync in my PC blocks the current thread for 120 msec before returning the task, and then the task is completed in just 20 msec. So I am not sure that there is much value at using these methods. Faking asynchrony is much easier by using the synchronous APIs together with some Linq.Async:

方法读取一个 6 MB 的文件
IAsyncEnumerable<string> lines = File.ReadLines("SomeFile.txt").ToAsyncEnumerable();