IAsyncEnumerable 用于永无止境的流

IAsyncEnumerable for never ending stream

我正在构建一个客户端,用于持续使用来自数据源的新记录。集成是基于拉动的,我的客户定期查询数据源以获取新记录。我使用 IAsyncEnumerable 作为 return 类型来表示这个连续的新记录流。以下是相关方法的要点:

public async IAsyncEnumerable<Record> StreamRecords(...)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        using var response = await _httpClient.SendAsync(request, cancellationToken);

        var records = _parser.Parse(response.Content);

        foreach (var r in records) yield return r;
        
        await Task.Delay(waitPeriod, cancellationToken);
    }
}

这是对 IAsyncEnumerable 的适当使用吗?此流应该“永无止境”或“连续”(至少直到 cancellationToken 或错误)。

与大多数事情一样:关键是要明确传达意图和行为。我看不出关于非结束序列的概念问题,无论是 IEnumerable<T> 还是 IAsyncEnumerable<T> - 但显然如果消费者调用 ToList[Async](),事情将以糟糕的方式结束。这本身不是问题 - 给出一个类似的场景:一些 I[Async]Enumerable<T> 序列是 不可重复的 - 它们要么 被迭代一次(随后的尝试失败),或者它们每次都可能产生 不同的结果 (甚至不需要像列表更改这样的事情)。这是完全合法的,但在野外存在的代码假定序列是可重复的并且会产生相同的数据。同样的讨论也存在于此,最终,错的不是 producer,而是 consumer.

因此:您的序列(生产者)将与理解数据应被视为无界的合理消费者一起完美工作。如果这就是您的应用程序所具有的,那么:太好了!

你的 StreamRecords 方法 returns 本质上是一个消费序列,本质上与方法返回的序列相似 BlockingCollection<T>.GetConsumingEnumerable and ChannelReader<T>.ReadAllAsync. Consuming means that when the caller enumerates the sequence, the returned elements are permanently removed from some backing storage. In the case of these two methods the backing storage is an internal ConcurrentQueue<T>. In your case (based on comment) 后备存储位于服务器端,有一些知道接下来要获取什么数据的客户端代码。

公开消费序列会带来一些挑战:

  1. 该方法的名称中是否应包含 ConsumingDestructive 一词?
  2. 如果取消会怎样?当取消信号到达时,序列是否足够响应?来电者的期望是否得到满足?
  3. 如果调用者通过 await foreach 循环中故意 breaking 或 returning 或循环内抛出的瞬态异常不情愿地过早放弃枚举,会发生什么情况?消耗的元素是否有丢失的危险?

关于第一个挑战,您可以阅读 this GitHub issue, or even better watch this video,其中讨论了各种选项。剧透警告,微软工程师解决了看似无害的问题 ReadAllAsync.

关于第二个挑战,您可以阅读 this 问题,表明微软关于 ChannelReader<T>.ReadAllAsync API 做出的(技术上合理的)决定导致 non-intuitive/unexpected行为。

关于第三个挑战,您可以考虑利用迭代器方法中 finally 块的机制。查看 答案了解更多详情。

由于这些细微差别,为调用者提供额外的消费选项可能是个好主意。例如 public Task<Record[]> TakeAllNewRecords()