如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息

How to safely iterate over an IAsyncEnumerable to send a collection downstream for message processing in batches

我看过 LINQ with IAsyncEnumerable 上的聊天,它让我对处理 IAsyncEnumerables 的扩展方法有了一些了解,但坦率地说,对于实际应用程序来说还不够详细,尤其是就我的经验水平而言,而且我知道 samples/documentation 对于 IAsyncEnumerables

还不存在

我正在尝试从文件中读取,对流进行一些转换,返回 IAsyncEnumerable,然后在获得任意数量的对象后将这些对象发送到下游,例如:

await foreach (var data in ProcessBlob(downloadedFile))
{
    //todo add data to List<T> called listWithPreConfiguredNumberOfElements
    if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
        await _messageHandler.Handle(listWithPreConfiguredNumberOfElements);
        
    //repeat the behaviour till all the elements in the IAsyncEnumerable returned by ProcessBlob are sent downstream to the _messageHandler.
}

到目前为止,我对此事的理解是 await foreach 行正在处理使用 Tasks(或 ValueTasks)的数据,因此我们没有预先计算。我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享数据似乎不是线程安全的。

我正在使用 System.Linq.Async 包,希望我可以使用相关的扩展方法。我可以看到一些 TakeWhile 形式的承诺,但我对我打算执行的任务的线程安全性的理解并不完全,这让我失去了信心。

任何帮助或推动正确的方向将不胜感激,谢谢。

System.Interactive.Async 中有一个运算符 Buffer 可以满足您的需求。

// Projects each element of an async-enumerable sequence into consecutive
// non-overlapping buffers which are produced based on element count information.
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, int count);

此包包含 AmbThrowCatchDeferFinally 等在 Linq 中没有直接等效项的运算符,但他们在 System.Reactive 中确实有一个等价物。这是因为 IAsyncEnumerables 在概念上更接近 IObservables 而不是 IEnumerables(因为两者都有时间维度,而 IEnumerables 是永恒的)。

I'm also hesitant to use a List variable and just do a length-check on that as sharing that data across threads doesn't seem very thread-safe.

在处理 async 时,您需要考虑 执行流 ,而不是线程;由于您正在 await-ing 处理步骤,因此访问列表实际上没有并发问题,因为无论使用哪个线程:列表一次只能访问一次。

如果您仍然担心,您可以new每批列出一个列表,但这可能有点矫枉过正。然而,您需要的是两个添加 - 批次之间的重置和最终处理步骤:

var listWithPreConfiguredNumberOfElements = new List<YourType>(preConfiguredNumber);
await foreach (var data in ProcessBlob(downloadedFile)) // CAF?
{
    listWithPreConfiguredNumberOfElements.Add(data);
    if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
    {
        await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
        listWithPreConfiguredNumberOfElements.Clear(); // reset for a new batch
        // (replace this with a "new" if you're still concerned about concurrency)
    }
}
if (listWithPreConfiguredNumberOfElements.Any())
{   // process any stragglers
    await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
}

您可能选择在标记为// CAF?

的三个位置使用ConfigureAwait(false)