使用 TaskCompletionSource 构建 IAsyncEnumerable

Build an IAsyncEnumerable using TaskCompletionSource

我有一个方法接受使用 yield 运算符转换的 IEnumerable 和 returns。要转换可枚举的一个元素,我需要先知道另一个元素的值。因此,我想到了使用 TaskCompletionSource 来创建类似 "promise" 的东西。

这里的问题是,如果第一个 TestFieldA 的值不是 "a",则此代码会导致死锁。一种解决方案是在将可枚举传递给方法之前对其进行排序——在这种情况下,完全不需要 TaskCompletionSource 。但是,我想知道如果没有这个是否可以完成。我也知道这可以通过一些 LINQ 查询来完成,但这需要多次枚举输入,我想避免这种情况。

这就是我要实现的目标。 (仅在第一个 TestFieldA == "a" 时有效)

class Test
{
    public string TestFieldA {get;set;}
    public int TestFieldB {get;set;}
}


private async IAsyncEnumerable<Test> Transform(IEnumerable<Test> inputEnumerable)
{
    var tcs = new TaskCompletionSource<int>();

    foreach(var input in inputEnumerable)
    {
        if (input.TestFieldA == "a")
        {
            tcs.SetResult(input.TestFieldB);
            yield return input;
        }
        else
        {
            input.TestFieldB -= await tcs.Task;
            yield return input;
        }
    }
}

你目前的计划似乎取决于能否回到过去。我建议只将不合适的项目存储在队列中(而不是放弃它们),直到找到具有 TestFieldA 值的合适项目。

此时,您将所有排队的项目出列,使用现在找到的值并依次产生每个项目。然后 yield 具有所需 TestFieldA 值的项目。

你如何从那里开始有点不清楚,因为我不知道如果 a) 找到另一个 a 项目和 b) 如果没有找到 a 项目你想做什么找到了。

这里不需要 Task(CompletionSource)、asyncIAsyncEnumerable - 在找到您的 a 之前,您不能产生任何东西价值 - 除非你可以使用时间机器。


还请记住,迭代器依赖于让调用者反复请求新项目以取得进展 - 您在每个 yield 处暂停,直到他们这样做。因此,如果产生的项目有任何“Task-like”的东西,那么考虑尽早尝试 yield 项目将是极端冒险的;调用者可能决定 await 进行一次而不是继续进行您需要他们进行的枚举。

一个想法可以是 return 可枚举的任务而不是 IAsyncEnumerable。像这样:

private IEnumerable<Task<Test>> Transform(IEnumerable<Test> source)
{
    var tcs = new TaskCompletionSource<int>(
        TaskCreationOptions.RunContinuationsAsynchronously);

    foreach (var item in source)
    {
        if (item.TestFieldA == "a")
        {
            tcs.TrySetResult(item.TestFieldB);
        }
        yield return TransformItemAsync(item);
    }

    async Task<Test> TransformItemAsync(Test input)
    {
        var value = await tcs.Task.ConfigureAwait(false);
        input.TestFieldB -= value;
        return input;
    }
}

如果调用者试图按顺序等待每个任务,这仍然会造成死锁问题。为了解决这个问题,调用者应该有办法按完成顺序以某种方式等待任务。在 Stephen Cleary 的 Nito.AsyncEx library, the extension method OrderByCompletion:

中有类似的内容
// Creates a new collection of tasks that complete in order.
public static List<Task<T>> OrderByCompletion<T>(this IEnumerable<Task<T>> @this);

如果需要,您也可以从 here 获取源代码。