使用 TaskCompletionSource 构建 IAsyncEnumerable
Build an IAsyncEnumerable using TaskCompletionSource
我有一个方法接受使用 yield 运算符转换的 IEnumerable
和 returns。要转换可枚举的一个元素,我需要先知道另一个元素的值。因此,我想到了使用 TaskCompletionSource
来创建类似 "promise" 的东西。
这里的问题是,如果第一个 TestFieldA
的值不是 "a",则此代码会导致死锁。一种解决方案是在将可枚举传递给方法之前对其进行排序——在这种情况下,完全不需要 TaskCompletionSource
。但是,我想知道如果没有这个是否可以完成。我也知道这可以通过一些 LINQ 查询来完成,但这需要多次枚举输入,我想避免这种情况。
这就是我要实现的目标。 (仅在第一个 TestFieldA == "a"
时有效)
class Test
{
public string TestFieldA {get;set;}
public int TestFieldB {get;set;}
}
private async IAsyncEnumerable<Test> Transform(IEnumerable<Test> inputEnumerable)
{
var tcs = new TaskCompletionSource<int>();
foreach(var input in inputEnumerable)
{
if (input.TestFieldA == "a")
{
tcs.SetResult(input.TestFieldB);
yield return input;
}
else
{
input.TestFieldB -= await tcs.Task;
yield return input;
}
}
}
你目前的计划似乎取决于能否回到过去。我建议只将不合适的项目存储在队列中(而不是放弃它们),直到找到具有 TestFieldA
值的合适项目。
此时,您将所有排队的项目出列,使用现在找到的值并依次产生每个项目。然后 yield 具有所需 TestFieldA
值的项目。
你如何从那里开始有点不清楚,因为我不知道如果 a) 找到另一个 a
项目和 b) 如果没有找到 a
项目你想做什么找到了。
这里不需要 Task
(CompletionSource
)、async
或 IAsyncEnumerable
- 在找到您的 a
之前,您不能产生任何东西价值 - 除非你可以使用时间机器。
还请记住,迭代器依赖于让调用者反复请求新项目以取得进展 - 您在每个 yield
处暂停,直到他们这样做。因此,如果产生的项目有任何“Task
-like”的东西,那么考虑尽早尝试 yield
项目将是极端冒险的;调用者可能决定 await
进行一次而不是继续进行您需要他们进行的枚举。
一个想法可以是 return 可枚举的任务而不是 IAsyncEnumerable
。像这样:
private IEnumerable<Task<Test>> Transform(IEnumerable<Test> source)
{
var tcs = new TaskCompletionSource<int>(
TaskCreationOptions.RunContinuationsAsynchronously);
foreach (var item in source)
{
if (item.TestFieldA == "a")
{
tcs.TrySetResult(item.TestFieldB);
}
yield return TransformItemAsync(item);
}
async Task<Test> TransformItemAsync(Test input)
{
var value = await tcs.Task.ConfigureAwait(false);
input.TestFieldB -= value;
return input;
}
}
如果调用者试图按顺序等待每个任务,这仍然会造成死锁问题。为了解决这个问题,调用者应该有办法按完成顺序以某种方式等待任务。在 Stephen Cleary 的 Nito.AsyncEx
library, the extension method OrderByCompletion
:
中有类似的内容
// Creates a new collection of tasks that complete in order.
public static List<Task<T>> OrderByCompletion<T>(this IEnumerable<Task<T>> @this);
如果需要,您也可以从 here 获取源代码。
我有一个方法接受使用 yield 运算符转换的 IEnumerable
和 returns。要转换可枚举的一个元素,我需要先知道另一个元素的值。因此,我想到了使用 TaskCompletionSource
来创建类似 "promise" 的东西。
这里的问题是,如果第一个 TestFieldA
的值不是 "a",则此代码会导致死锁。一种解决方案是在将可枚举传递给方法之前对其进行排序——在这种情况下,完全不需要 TaskCompletionSource
。但是,我想知道如果没有这个是否可以完成。我也知道这可以通过一些 LINQ 查询来完成,但这需要多次枚举输入,我想避免这种情况。
这就是我要实现的目标。 (仅在第一个 TestFieldA == "a"
时有效)
class Test
{
public string TestFieldA {get;set;}
public int TestFieldB {get;set;}
}
private async IAsyncEnumerable<Test> Transform(IEnumerable<Test> inputEnumerable)
{
var tcs = new TaskCompletionSource<int>();
foreach(var input in inputEnumerable)
{
if (input.TestFieldA == "a")
{
tcs.SetResult(input.TestFieldB);
yield return input;
}
else
{
input.TestFieldB -= await tcs.Task;
yield return input;
}
}
}
你目前的计划似乎取决于能否回到过去。我建议只将不合适的项目存储在队列中(而不是放弃它们),直到找到具有 TestFieldA
值的合适项目。
此时,您将所有排队的项目出列,使用现在找到的值并依次产生每个项目。然后 yield 具有所需 TestFieldA
值的项目。
你如何从那里开始有点不清楚,因为我不知道如果 a) 找到另一个 a
项目和 b) 如果没有找到 a
项目你想做什么找到了。
这里不需要 Task
(CompletionSource
)、async
或 IAsyncEnumerable
- 在找到您的 a
之前,您不能产生任何东西价值 - 除非你可以使用时间机器。
还请记住,迭代器依赖于让调用者反复请求新项目以取得进展 - 您在每个 yield
处暂停,直到他们这样做。因此,如果产生的项目有任何“Task
-like”的东西,那么考虑尽早尝试 yield
项目将是极端冒险的;调用者可能决定 await
进行一次而不是继续进行您需要他们进行的枚举。
一个想法可以是 return 可枚举的任务而不是 IAsyncEnumerable
。像这样:
private IEnumerable<Task<Test>> Transform(IEnumerable<Test> source)
{
var tcs = new TaskCompletionSource<int>(
TaskCreationOptions.RunContinuationsAsynchronously);
foreach (var item in source)
{
if (item.TestFieldA == "a")
{
tcs.TrySetResult(item.TestFieldB);
}
yield return TransformItemAsync(item);
}
async Task<Test> TransformItemAsync(Test input)
{
var value = await tcs.Task.ConfigureAwait(false);
input.TestFieldB -= value;
return input;
}
}
如果调用者试图按顺序等待每个任务,这仍然会造成死锁问题。为了解决这个问题,调用者应该有办法按完成顺序以某种方式等待任务。在 Stephen Cleary 的 Nito.AsyncEx
library, the extension method OrderByCompletion
:
// Creates a new collection of tasks that complete in order.
public static List<Task<T>> OrderByCompletion<T>(this IEnumerable<Task<T>> @this);
如果需要,您也可以从 here 获取源代码。