在 Action 中使用带有 await 的 ForEachAsync 时不等待

Doesn't await when using ForEachAsync with await inside Action

下面应该return"C",但是returns "B"

using System.Data.Entity;
//...
var state = "A";
var qry = (from f in db.myTable select f);
await qry.ForEachAsync(async (myRecord) => {
   await DoStuffAsync(myRecord);
   state = "B";
});
state = "C";
return state;

它不等待 DoStuffAsync 完成,state="C" 运行,然后 state="B" 执行(因为它仍在等待)。

那是因为 ForEachAsync 的实现不等待委托操作

moveNextTask = enumerator.MoveNextAsync(cancellationToken);
action(current);

https://github.com/mono/entityframework/blob/master/src/EntityFramework/Infrastructure/IDbAsyncEnumerableExtensions.cs#L19

但那是因为,您不能等待一个动作,委托需要是一个 Func,其中 returns 一个任务 - 请参阅 How do you implement an async action delegate method?

因此,在 Microsoft 提供包含 Func 委托并使用 await 调用它的签名之前,您必须推出自己的扩展方法。我现在正在使用以下内容。

public static async Task ForEachAsync<T>(
    this IQueryable<T> enumerable, Func<T, Task> action, CancellationToken cancellationToken) //Now with Func returning Task
{
    var asyncEnumerable = (IDbAsyncEnumerable<T>)enumerable;
    using (var enumerator = asyncEnumerable.GetAsyncEnumerator())
    {

        if (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false))
        {
            Task<bool> moveNextTask;
            do
            {
                var current = enumerator.Current;
                moveNextTask = enumerator.MoveNextAsync(cancellationToken);
                await action(current); //now with await
            }
            while (await moveNextTask.ConfigureAwait(continueOnCapturedContext: false));
        }
    }
}

有了这个,您 OP 中的原始测试代码将按预期工作。

由于 DbSet 实现了 IAsyncEnumerable,请考虑使用以下扩展方法:

public async static Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default)
{
    if (source == null) return;
    await foreach (T item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
    {
        await action(item);
    }
}

用法:

var qry = (from f in db.myTable select f);
await qry
     .AsAsyncEnumerable()
     .ForEachAsync(async arg =>
     {
         await DoStuffAsync(arg);
     });