如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息
How to safely iterate over an IAsyncEnumerable to send a collection downstream for message processing in batches
我看过 LINQ with IAsyncEnumerable 上的聊天,它让我对处理 IAsyncEnumerables 的扩展方法有了一些了解,但坦率地说,对于实际应用程序来说还不够详细,尤其是就我的经验水平而言,而且我知道 samples/documentation 对于 IAsyncEnumerable
s
还不存在
我正在尝试从文件中读取,对流进行一些转换,返回 IAsyncEnumerable
,然后在获得任意数量的对象后将这些对象发送到下游,例如:
await foreach (var data in ProcessBlob(downloadedFile))
{
//todo add data to List<T> called listWithPreConfiguredNumberOfElements
if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements);
//repeat the behaviour till all the elements in the IAsyncEnumerable returned by ProcessBlob are sent downstream to the _messageHandler.
}
到目前为止,我对此事的理解是 await foreach
行正在处理使用 Task
s(或 ValueTask
s)的数据,因此我们没有预先计算。我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享数据似乎不是线程安全的。
我正在使用 System.Linq.Async
包,希望我可以使用相关的扩展方法。我可以看到一些 TakeWhile
形式的承诺,但我对我打算执行的任务的线程安全性的理解并不完全,这让我失去了信心。
任何帮助或推动正确的方向将不胜感激,谢谢。
包 System.Interactive.Async 中有一个运算符 Buffer
可以满足您的需求。
// Projects each element of an async-enumerable sequence into consecutive
// non-overlapping buffers which are produced based on element count information.
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);
此包包含 Amb
、Throw
、Catch
、Defer
、Finally
等在 Linq 中没有直接等效项的运算符,但他们在 System.Reactive 中确实有一个等价物。这是因为 IAsyncEnumerable
s 在概念上更接近 IObservable
s 而不是 IEnumerable
s(因为两者都有时间维度,而 IEnumerable
s 是永恒的)。
I'm also hesitant to use a List variable and just do a length-check on that as sharing that data across threads doesn't seem very thread-safe.
在处理 async
时,您需要考虑 执行流 ,而不是线程;由于您正在 await
-ing 处理步骤,因此访问列表实际上没有并发问题,因为无论使用哪个线程:列表一次只能访问一次。
如果您仍然担心,您可以new
每批列出一个列表,但这可能有点矫枉过正。然而,您做需要的是两个添加 - 批次之间的重置和最终处理步骤:
var listWithPreConfiguredNumberOfElements = new List<YourType>(preConfiguredNumber);
await foreach (var data in ProcessBlob(downloadedFile)) // CAF?
{
listWithPreConfiguredNumberOfElements.Add(data);
if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
{
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
listWithPreConfiguredNumberOfElements.Clear(); // reset for a new batch
// (replace this with a "new" if you're still concerned about concurrency)
}
}
if (listWithPreConfiguredNumberOfElements.Any())
{ // process any stragglers
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
}
您可能也选择在标记为// CAF?
的三个位置使用ConfigureAwait(false)
我看过 LINQ with IAsyncEnumerable 上的聊天,它让我对处理 IAsyncEnumerables 的扩展方法有了一些了解,但坦率地说,对于实际应用程序来说还不够详细,尤其是就我的经验水平而言,而且我知道 samples/documentation 对于 IAsyncEnumerable
s
我正在尝试从文件中读取,对流进行一些转换,返回 IAsyncEnumerable
,然后在获得任意数量的对象后将这些对象发送到下游,例如:
await foreach (var data in ProcessBlob(downloadedFile))
{
//todo add data to List<T> called listWithPreConfiguredNumberOfElements
if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements);
//repeat the behaviour till all the elements in the IAsyncEnumerable returned by ProcessBlob are sent downstream to the _messageHandler.
}
到目前为止,我对此事的理解是 await foreach
行正在处理使用 Task
s(或 ValueTask
s)的数据,因此我们没有预先计算。我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享数据似乎不是线程安全的。
我正在使用 System.Linq.Async
包,希望我可以使用相关的扩展方法。我可以看到一些 TakeWhile
形式的承诺,但我对我打算执行的任务的线程安全性的理解并不完全,这让我失去了信心。
任何帮助或推动正确的方向将不胜感激,谢谢。
包 System.Interactive.Async 中有一个运算符 Buffer
可以满足您的需求。
// Projects each element of an async-enumerable sequence into consecutive
// non-overlapping buffers which are produced based on element count information.
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);
此包包含 Amb
、Throw
、Catch
、Defer
、Finally
等在 Linq 中没有直接等效项的运算符,但他们在 System.Reactive 中确实有一个等价物。这是因为 IAsyncEnumerable
s 在概念上更接近 IObservable
s 而不是 IEnumerable
s(因为两者都有时间维度,而 IEnumerable
s 是永恒的)。
I'm also hesitant to use a List variable and just do a length-check on that as sharing that data across threads doesn't seem very thread-safe.
在处理 async
时,您需要考虑 执行流 ,而不是线程;由于您正在 await
-ing 处理步骤,因此访问列表实际上没有并发问题,因为无论使用哪个线程:列表一次只能访问一次。
如果您仍然担心,您可以new
每批列出一个列表,但这可能有点矫枉过正。然而,您做需要的是两个添加 - 批次之间的重置和最终处理步骤:
var listWithPreConfiguredNumberOfElements = new List<YourType>(preConfiguredNumber);
await foreach (var data in ProcessBlob(downloadedFile)) // CAF?
{
listWithPreConfiguredNumberOfElements.Add(data);
if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
{
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
listWithPreConfiguredNumberOfElements.Clear(); // reset for a new batch
// (replace this with a "new" if you're still concerned about concurrency)
}
}
if (listWithPreConfiguredNumberOfElements.Any())
{ // process any stragglers
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
}
您可能也选择在标记为// CAF?
ConfigureAwait(false)