我如何将一个 IEnumerable 异步转换为另一个,就像 LINQ 的 Select() 一样,但对每个转换后的项目使用 await?
How can I asynchronously transform one IEnumerable to another, just like LINQ's Select(), but using await on every transformed item?
考虑这种情况:
class Product { }
interface IWorker
{
Task<Product> CreateProductAsync();
}
我现在得到一个 IEnumerable<IWorker> workers
,我应该从中创建一个 IEnumerable<Product>
,我必须将其传递给其他一些我无法更改的函数:
void CheckProducts(IEnumerable<Product> products);
此方法需要访问整个 IEnumerable<Product>
。无法细分它并在多个子集上调用 CheckProducts
。
一个明显的解决方案是:
CheckProducts(workers.Select(worker => worker.CreateProductAsync().Result));
但这当然是阻塞的,因此这只是我最后的选择。
从语法上讲,我正是需要这个,只是没有阻塞。
我不能在传递给 Select()
的函数内部使用 await
因为我必须将它标记为 async
并且需要它 return 一个 Task
本身,我将一无所获。最后我需要一个 IEnumerable<Product>
而不是 IEnumerable<Task<Product>>
.
请务必了解生产产品的工人的顺序很重要,他们的工作不得重叠。否则,我会这样做:
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
var tasks = workers.Select(worker => worker.CreateProductAsync());
return await Task.WhenAll(tasks);
}
但不幸的是,Task.WhenAll()
并行执行了一些任务,而我需要它们顺序执行。
如果我有一个 IReadOnlyList<IWorker>
而不是 IEnumerable<IWorker>
:
,这是实现它的一种可能性
async Task<IEnumerable<Product>> CreateProductsAsync(IReadOnlyList<IWorker> workers)
{
var resultList = new Product[workers.Count];
for (int i = 0; i < resultList.Length; ++i)
resultList[i] = await workers[i].CreateProductAsync();
return resultList;
}
但我必须处理一个 IEnumerable
,更糟糕的是,它通常非常庞大,有时甚至是无限的,永远在生产工人。如果我知道它的大小合适,我会在它上面调用 ToArray()
并使用上面的方法。
最终的解决方案是这样的:
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
foreach (var worker in workers)
yield return await worker.CreateProductAsync();
}
但是 yield
和 await
不兼容,如 this answer 中所述。看看那个答案,那个假设的 IAsyncEnumerator
对我有帮助吗? C# 中是否同时存在类似的东西?
我面临的问题总结:
- 我有无限可能
IEnumerable<IWorker>
- 我想按照它们进入时的相同顺序异步调用
CreateProductAsync()
- 最后我需要一个
IEnumerable<Product>
我已经尝试但不起作用的总结:
- 我不能使用
Task.WhenAll()
因为它并行执行任务。
- 我无法使用
ToArray()
并在循环中手动处理该数组,因为我的序列有时是无穷无尽的。
- 我无法使用
yield return
,因为它与 await
不兼容。
有人为我提供解决方案或解决方法吗?
否则我将不得不使用那个阻塞代码...
除非我误解了什么,否则我不明白你为什么不直接这样做:
var productList = new List<Product>(workers.Count())
foreach(var worker in workers)
{
productList.Add(await worker.CreateProductAsync());
}
CheckProducts(productList);
如果您只是继续清除大小为 1 的列表会怎么样?
var productList = new List<Product>(1);
var checkTask = Task.CompletedTask;
foreach(var worker in workers)
{
await checkTask;
productList.Clear();
productList.Add(await worker.CreateProductAsync());
checkTask = Task.Run(CheckProducts(productList));
}
await checkTask;
根据您的要求,我可以汇总以下内容:
1) 工人按顺序处理
2)随时开放接收新Worker
因此,利用数据流 TransformBlock
具有内置队列并按顺序处理项目这一事实。现在我们可以随时接受生产者的Workers
接下来我们制作TransformBlock
观察结果,以便消费者可以按需消费Products
。
进行了一些快速更改并启动了消费者部分。这只是获取 Transformer
生成的可观察对象并将其映射到生成每个产品的可枚举对象。这里的背景是 ToEnumerable()
.
The ToEnumerator operator returns an enumerator from an observable sequence. The enumerator will yield each item in the sequence as it is produced
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ClassLibrary1
{
public class WorkerProducer
{
public async Task ProduceWorker()
{
//await ProductTransformer_Transformer.SendAsync(new Worker())
}
}
public class ProductTransformer
{
public IObservable<Product> Products { get; private set; }
public TransformBlock<Worker, Product> Transformer { get; private set; }
private Task<Product> CreateProductAsync(Worker worker) => Task.FromResult(new Product());
public ProductTransformer()
{
Transformer = new TransformBlock<Worker, Product>(wrk => CreateProductAsync(wrk));
Products = Transformer.AsObservable();
}
}
public class ProductConsumer
{
private ThirdParty ThirdParty { get; set; } = new ThirdParty();
private ProductTransformer Transformer { get; set; }
public ProductConsumer()
{
ThirdParty.CheckProducts(Transformer.Products.ToEnumerable());
}
public class Worker { }
public class Product { }
public class ThirdParty
{
public void CheckProducts(IEnumerable<Product> products)
{
}
}
}
尝试
workers.ForEach(async wrkr =>
{
var prdlist = await wrkr.CreateProductAsync();
//Remaing tasks....
});
给你两个建议 OP
多次调用解决方案
如果您被允许多次调用 CheckProducts,您可以简单地这样做:
foreach (var worker in workers)
{
var product = await worker.CreateProductAsync();
CheckProducts(new [] { product } );
}
如果它增加了价值,我敢肯定您也可以想出一种分批进行的方法,比如一次 100 个。
线程池解决方案
如果不允许多次调用CheckProducts
,并且不允许修改CheckProducts
,则无法强制它放弃控制权并允许其他延续到 运行。因此,无论您做什么,都不能将异步性强加到您传递给它的 IEnumerable 中,这不仅是因为编译器检查,还因为它可能会死锁。
所以这是一个线程池解决方案。这个想法是创建一个单独的线程来处理系列产品;处理器是异步的,因此对 CreateProductAsync()
的调用仍将根据需要让出对已发布到同步上下文的任何其他内容的控制。但是它不能神奇地强制 CheckProduct
放弃控制,所以如果它能够比创建产品更快地检查产品,它仍然有可能偶尔会阻塞。在我的示例中,我使用 Monitor.Wait()
因此 O/S 不会安排线程,直到有东西在等待它。当它阻塞时,你仍然会用完一个线程资源,但至少你不会在忙等待循环中浪费 CPU 时间。
public static IEnumerable<Product> CreateProducts(IEnumerable<Worker> workers)
{
var queue = new ConcurrentQueue<Product>();
var task = Task.Run(() => ConvertProducts(workers.GetEnumerator(), queue));
while (true)
{
while (queue.Count > 0)
{
Product product;
var ok = queue.TryDequeue(out product);
if (ok) yield return product;
}
if (task.IsCompleted && queue.Count == 0) yield break;
Monitor.Wait(queue, 1000);
}
}
private static async Task ConvertProducts(IEnumerator<Worker> input, ConcurrentQueue<Product> output)
{
while (input.MoveNext())
{
var current = input.Current;
var product = await current.CreateProductAsync();
output.Enqueue(product);
Monitor.Pulse(output);
}
}
您可以使用 Task.WhenAll
,但不是 returning Task.WhenAll
的结果,return 任务集合转换为结果集合。
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
var tasks = workers.Select(worker => worker.CreateProductAsync()).ToList();
await Task.WhenAll(tasks);
return tasks.Select(task => task.Result);
}
任务顺序将保持不变。
似乎只需要 return await Task.WhenAll()
就可以了
来自 Task.WhenAll Method (IEnumerable>)
的文档
The Task.Result property of the returned task will be set to
an array containing all of the results of the supplied tasks in the
same order as they were provided...
如果 worker 需要按照创建的顺序一个一个地执行,并且基于另一个功能需要整个 worker 结果集的要求
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
var products = new List<product>();
foreach (var worker in workers)
{
product = await worker.CreateProductAsync();
products.Add(product);
}
return products;
}
情况:
在这里你说你需要同步执行此操作,因为 IEnumerable 不支持异步并且要求是你需要一个 IEnumerable<Product>
.
I am now given an IEnumerable workers and am supposed to
create an IEnumerable from it that I have to pass to some
other function that I cannot alter:
这里你说 整个 产品集需要同时处理,大概是对 void CheckProducts(IEnumerable<Product> products).
进行一次调用
This methods needs to check the entire Product set as a whole. It is
not possible to subdivide the result.
这里你说可枚举可以产生无限数量的项目
But I must deal with an IEnumerable and, even worse, it is usually
quite huge, sometimes it is even unlimited, yielding workers forever.
If I knew that its size was decent, I would just call ToArray() on it
and use the method above.
所以让我们把这些放在一起。您需要在同步环境中对不定数量的项目进行异步处理,然后将整个集合作为一个整体进行评估...同步。
潜在问题:
- 1:要对一个集合进行整体评估,必须将其完全枚举。要完全枚举一个集合,它必须是有限的。因此,不可能将无限集合作为一个整体来评估。
- 2:在同步和异步之间来回切换强制异步代码同步到运行。从需求的角度来看,这可能没问题,但从技术的角度来看,它可能会导致死锁(也许是不可避免的,我不知道。查一下。我不是专家)。
问题 1 的可能解决方案:
- 1:强制源为
ICollection<T>
而不是 IEnumerable<T>
。这强制了有限性。
- 2:改变 CheckProducts 算法以迭代处理,可能产生 中间 结果,同时仍然在内部保持持续聚合。
问题 2 的可能解决方案:
- 1: 使
CheckProducts
方法异步。
- 2:使
CreateProduct...
方法同步。
底线
你不能按照你的要求去做,这听起来像是别人在口述你的要求。他们需要更改一些要求,因为他们要求的(我真的很讨厌使用这个词)是不可能的。您是否误解了某些要求?
IEnumerator 是同步接口,因此如果 CheckProducts 在下一个工作人员完成创建产品之前枚举下一个产品,则阻塞是不可避免的。
不过,您可以通过在另一个线程上创建产品、将它们添加到 BlockingCollection<T> 并在主线程上产生它们来实现并行性:
static IEnumerable<Product> CreateProducts(IEnumerable<IWorker> workers)
{
var products = new BlockingCollection<Product>(3);
Task.Run(async () => // On the thread pool...
{
foreach (IWorker worker in workers)
{
Product product = await worker.CreateProductAsync(); // Create products serially.
products.Add(product); // Enqueue the product, blocking if the queue is full.
}
products.CompleteAdding(); // Notify GetConsumingEnumerable that we're done.
});
return products.GetConsumingEnumerable();
}
为避免无限内存消耗,您可以选择将队列的容量指定为 BlockingCollection 的构造函数参数。我在上面的代码中使用了 3.
您现在可以使用 async
、IEnumerable
和 LINQ 执行此操作,但异步之后链中的每个方法都是 Task<T>
,并且您需要使用类似 await Task.WhenAll
最后。您可以在 return Task<T>
的 LINQ 方法中使用异步 lambda。这些你不需要同步等待。
Select
将按顺序启动您的任务,即在 select 枚举每个任务之前,它们甚至不会作为任务存在,并且在您停止枚举后不会继续执行。如果您想 await
单独 await
任务,您也可以 运行 自己 foreach
完成任务。
你可以 break
像其他任何东西一样 foreach
而不是它启动所有这些,所以这也适用于无限枚举。
public async Task Main()
{
// This async method call could also be an async lambda
foreach (var task in GetTasks())
{
var result = await task;
Console.WriteLine($"Result is {result}");
if (result > 5) break;
}
}
private IEnumerable<Task<int>> GetTasks()
{
return GetNumbers().Select(WaitAndDoubleAsync);
}
private async Task<int> WaitAndDoubleAsync(int i)
{
Console.WriteLine($"Waiting {i} seconds asynchronously");
await Task.Delay(TimeSpan.FromSeconds(i));
return i * 2;
}
/// Keeps yielding numbers
private IEnumerable<int> GetNumbers()
{
var i = 0;
while (true) yield return i++;
}
输出以下内容,然后停止:
Waiting 0 seconds asynchronously
Result is 0
Waiting 1 seconds asynchronously
Result is 2
Waiting 2 seconds asynchronously
Result is 4
Waiting 3 seconds asynchronously
Result is 6
重要的是你不能在同一个方法中混合yield
和await
,但是你可以yield
Task
s return从一个使用 await
的方法中编辑出来的非常好,所以你可以通过将它们拆分成单独的方法来一起使用它们。 Select
已经是使用 yield
的方法,因此您可能不需要为此编写自己的方法。
在您的 post 中,您正在寻找 Task<IEnumerable<Product>>
,但您实际可以使用的是 IEnumerable<Task<Product>>
。
你可以更进一步,例如如果你有类似 REST API 的东西,其中一个资源可以链接到其他资源,比如你只是想获得一个组的用户列表,但是当你找到你感兴趣的用户时停止:
public async Task<IEnumerable<Task<User>>> GetUserTasksAsync(int groupId)
{
var group = await GetGroupAsync(groupId);
return group.UserIds.Select(GetUserAsync);
}
foreach (var task in await GetUserTasksAsync(1))
{
var user = await task;
...
}
考虑这种情况:
class Product { }
interface IWorker
{
Task<Product> CreateProductAsync();
}
我现在得到一个 IEnumerable<IWorker> workers
,我应该从中创建一个 IEnumerable<Product>
,我必须将其传递给其他一些我无法更改的函数:
void CheckProducts(IEnumerable<Product> products);
此方法需要访问整个 IEnumerable<Product>
。无法细分它并在多个子集上调用 CheckProducts
。
一个明显的解决方案是:
CheckProducts(workers.Select(worker => worker.CreateProductAsync().Result));
但这当然是阻塞的,因此这只是我最后的选择。 从语法上讲,我正是需要这个,只是没有阻塞。
我不能在传递给 Select()
的函数内部使用 await
因为我必须将它标记为 async
并且需要它 return 一个 Task
本身,我将一无所获。最后我需要一个 IEnumerable<Product>
而不是 IEnumerable<Task<Product>>
.
请务必了解生产产品的工人的顺序很重要,他们的工作不得重叠。否则,我会这样做:
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
var tasks = workers.Select(worker => worker.CreateProductAsync());
return await Task.WhenAll(tasks);
}
但不幸的是,Task.WhenAll()
并行执行了一些任务,而我需要它们顺序执行。
如果我有一个 IReadOnlyList<IWorker>
而不是 IEnumerable<IWorker>
:
async Task<IEnumerable<Product>> CreateProductsAsync(IReadOnlyList<IWorker> workers)
{
var resultList = new Product[workers.Count];
for (int i = 0; i < resultList.Length; ++i)
resultList[i] = await workers[i].CreateProductAsync();
return resultList;
}
但我必须处理一个 IEnumerable
,更糟糕的是,它通常非常庞大,有时甚至是无限的,永远在生产工人。如果我知道它的大小合适,我会在它上面调用 ToArray()
并使用上面的方法。
最终的解决方案是这样的:
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
foreach (var worker in workers)
yield return await worker.CreateProductAsync();
}
但是 yield
和 await
不兼容,如 this answer 中所述。看看那个答案,那个假设的 IAsyncEnumerator
对我有帮助吗? C# 中是否同时存在类似的东西?
我面临的问题总结:
- 我有无限可能
IEnumerable<IWorker>
- 我想按照它们进入时的相同顺序异步调用
CreateProductAsync()
- 最后我需要一个
IEnumerable<Product>
我已经尝试但不起作用的总结:
- 我不能使用
Task.WhenAll()
因为它并行执行任务。 - 我无法使用
ToArray()
并在循环中手动处理该数组,因为我的序列有时是无穷无尽的。 - 我无法使用
yield return
,因为它与await
不兼容。
有人为我提供解决方案或解决方法吗? 否则我将不得不使用那个阻塞代码...
除非我误解了什么,否则我不明白你为什么不直接这样做:
var productList = new List<Product>(workers.Count())
foreach(var worker in workers)
{
productList.Add(await worker.CreateProductAsync());
}
CheckProducts(productList);
如果您只是继续清除大小为 1 的列表会怎么样?
var productList = new List<Product>(1);
var checkTask = Task.CompletedTask;
foreach(var worker in workers)
{
await checkTask;
productList.Clear();
productList.Add(await worker.CreateProductAsync());
checkTask = Task.Run(CheckProducts(productList));
}
await checkTask;
根据您的要求,我可以汇总以下内容:
1) 工人按顺序处理
2)随时开放接收新Worker
因此,利用数据流 TransformBlock
具有内置队列并按顺序处理项目这一事实。现在我们可以随时接受生产者的Workers
接下来我们制作TransformBlock
观察结果,以便消费者可以按需消费Products
。
进行了一些快速更改并启动了消费者部分。这只是获取 Transformer
生成的可观察对象并将其映射到生成每个产品的可枚举对象。这里的背景是 ToEnumerable()
.
The ToEnumerator operator returns an enumerator from an observable sequence. The enumerator will yield each item in the sequence as it is produced
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ClassLibrary1
{
public class WorkerProducer
{
public async Task ProduceWorker()
{
//await ProductTransformer_Transformer.SendAsync(new Worker())
}
}
public class ProductTransformer
{
public IObservable<Product> Products { get; private set; }
public TransformBlock<Worker, Product> Transformer { get; private set; }
private Task<Product> CreateProductAsync(Worker worker) => Task.FromResult(new Product());
public ProductTransformer()
{
Transformer = new TransformBlock<Worker, Product>(wrk => CreateProductAsync(wrk));
Products = Transformer.AsObservable();
}
}
public class ProductConsumer
{
private ThirdParty ThirdParty { get; set; } = new ThirdParty();
private ProductTransformer Transformer { get; set; }
public ProductConsumer()
{
ThirdParty.CheckProducts(Transformer.Products.ToEnumerable());
}
public class Worker { }
public class Product { }
public class ThirdParty
{
public void CheckProducts(IEnumerable<Product> products)
{
}
}
}
尝试
workers.ForEach(async wrkr =>
{
var prdlist = await wrkr.CreateProductAsync();
//Remaing tasks....
});
给你两个建议 OP
多次调用解决方案
如果您被允许多次调用 CheckProducts,您可以简单地这样做:
foreach (var worker in workers)
{
var product = await worker.CreateProductAsync();
CheckProducts(new [] { product } );
}
如果它增加了价值,我敢肯定您也可以想出一种分批进行的方法,比如一次 100 个。
线程池解决方案
如果不允许多次调用CheckProducts
,并且不允许修改CheckProducts
,则无法强制它放弃控制权并允许其他延续到 运行。因此,无论您做什么,都不能将异步性强加到您传递给它的 IEnumerable 中,这不仅是因为编译器检查,还因为它可能会死锁。
所以这是一个线程池解决方案。这个想法是创建一个单独的线程来处理系列产品;处理器是异步的,因此对 CreateProductAsync()
的调用仍将根据需要让出对已发布到同步上下文的任何其他内容的控制。但是它不能神奇地强制 CheckProduct
放弃控制,所以如果它能够比创建产品更快地检查产品,它仍然有可能偶尔会阻塞。在我的示例中,我使用 Monitor.Wait()
因此 O/S 不会安排线程,直到有东西在等待它。当它阻塞时,你仍然会用完一个线程资源,但至少你不会在忙等待循环中浪费 CPU 时间。
public static IEnumerable<Product> CreateProducts(IEnumerable<Worker> workers)
{
var queue = new ConcurrentQueue<Product>();
var task = Task.Run(() => ConvertProducts(workers.GetEnumerator(), queue));
while (true)
{
while (queue.Count > 0)
{
Product product;
var ok = queue.TryDequeue(out product);
if (ok) yield return product;
}
if (task.IsCompleted && queue.Count == 0) yield break;
Monitor.Wait(queue, 1000);
}
}
private static async Task ConvertProducts(IEnumerator<Worker> input, ConcurrentQueue<Product> output)
{
while (input.MoveNext())
{
var current = input.Current;
var product = await current.CreateProductAsync();
output.Enqueue(product);
Monitor.Pulse(output);
}
}
您可以使用 Task.WhenAll
,但不是 returning Task.WhenAll
的结果,return 任务集合转换为结果集合。
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
var tasks = workers.Select(worker => worker.CreateProductAsync()).ToList();
await Task.WhenAll(tasks);
return tasks.Select(task => task.Result);
}
任务顺序将保持不变。
似乎只需要 return await Task.WhenAll()
就可以了
来自 Task.WhenAll Method (IEnumerable>)
The Task.Result property of the returned task will be set to an array containing all of the results of the supplied tasks in the same order as they were provided...
如果 worker 需要按照创建的顺序一个一个地执行,并且基于另一个功能需要整个 worker 结果集的要求
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
var products = new List<product>();
foreach (var worker in workers)
{
product = await worker.CreateProductAsync();
products.Add(product);
}
return products;
}
情况:
在这里你说你需要同步执行此操作,因为 IEnumerable 不支持异步并且要求是你需要一个 IEnumerable<Product>
.
I am now given an IEnumerable workers and am supposed to create an IEnumerable from it that I have to pass to some other function that I cannot alter:
这里你说 整个 产品集需要同时处理,大概是对 void CheckProducts(IEnumerable<Product> products).
This methods needs to check the entire Product set as a whole. It is not possible to subdivide the result.
这里你说可枚举可以产生无限数量的项目
But I must deal with an IEnumerable and, even worse, it is usually quite huge, sometimes it is even unlimited, yielding workers forever. If I knew that its size was decent, I would just call ToArray() on it and use the method above.
所以让我们把这些放在一起。您需要在同步环境中对不定数量的项目进行异步处理,然后将整个集合作为一个整体进行评估...同步。
潜在问题:
- 1:要对一个集合进行整体评估,必须将其完全枚举。要完全枚举一个集合,它必须是有限的。因此,不可能将无限集合作为一个整体来评估。
- 2:在同步和异步之间来回切换强制异步代码同步到运行。从需求的角度来看,这可能没问题,但从技术的角度来看,它可能会导致死锁(也许是不可避免的,我不知道。查一下。我不是专家)。
问题 1 的可能解决方案:
- 1:强制源为
ICollection<T>
而不是IEnumerable<T>
。这强制了有限性。 - 2:改变 CheckProducts 算法以迭代处理,可能产生 中间 结果,同时仍然在内部保持持续聚合。
问题 2 的可能解决方案:
- 1: 使
CheckProducts
方法异步。 - 2:使
CreateProduct...
方法同步。
底线
你不能按照你的要求去做,这听起来像是别人在口述你的要求。他们需要更改一些要求,因为他们要求的(我真的很讨厌使用这个词)是不可能的。您是否误解了某些要求?
IEnumerator
不过,您可以通过在另一个线程上创建产品、将它们添加到 BlockingCollection<T> 并在主线程上产生它们来实现并行性:
static IEnumerable<Product> CreateProducts(IEnumerable<IWorker> workers)
{
var products = new BlockingCollection<Product>(3);
Task.Run(async () => // On the thread pool...
{
foreach (IWorker worker in workers)
{
Product product = await worker.CreateProductAsync(); // Create products serially.
products.Add(product); // Enqueue the product, blocking if the queue is full.
}
products.CompleteAdding(); // Notify GetConsumingEnumerable that we're done.
});
return products.GetConsumingEnumerable();
}
为避免无限内存消耗,您可以选择将队列的容量指定为 BlockingCollection
您现在可以使用 async
、IEnumerable
和 LINQ 执行此操作,但异步之后链中的每个方法都是 Task<T>
,并且您需要使用类似 await Task.WhenAll
最后。您可以在 return Task<T>
的 LINQ 方法中使用异步 lambda。这些你不需要同步等待。
Select
将按顺序启动您的任务,即在 select 枚举每个任务之前,它们甚至不会作为任务存在,并且在您停止枚举后不会继续执行。如果您想 await
单独 await
任务,您也可以 运行 自己 foreach
完成任务。
你可以 break
像其他任何东西一样 foreach
而不是它启动所有这些,所以这也适用于无限枚举。
public async Task Main()
{
// This async method call could also be an async lambda
foreach (var task in GetTasks())
{
var result = await task;
Console.WriteLine($"Result is {result}");
if (result > 5) break;
}
}
private IEnumerable<Task<int>> GetTasks()
{
return GetNumbers().Select(WaitAndDoubleAsync);
}
private async Task<int> WaitAndDoubleAsync(int i)
{
Console.WriteLine($"Waiting {i} seconds asynchronously");
await Task.Delay(TimeSpan.FromSeconds(i));
return i * 2;
}
/// Keeps yielding numbers
private IEnumerable<int> GetNumbers()
{
var i = 0;
while (true) yield return i++;
}
输出以下内容,然后停止:
Waiting 0 seconds asynchronously
Result is 0
Waiting 1 seconds asynchronously
Result is 2
Waiting 2 seconds asynchronously
Result is 4
Waiting 3 seconds asynchronously
Result is 6
重要的是你不能在同一个方法中混合yield
和await
,但是你可以yield
Task
s return从一个使用 await
的方法中编辑出来的非常好,所以你可以通过将它们拆分成单独的方法来一起使用它们。 Select
已经是使用 yield
的方法,因此您可能不需要为此编写自己的方法。
在您的 post 中,您正在寻找 Task<IEnumerable<Product>>
,但您实际可以使用的是 IEnumerable<Task<Product>>
。
你可以更进一步,例如如果你有类似 REST API 的东西,其中一个资源可以链接到其他资源,比如你只是想获得一个组的用户列表,但是当你找到你感兴趣的用户时停止:
public async Task<IEnumerable<Task<User>>> GetUserTasksAsync(int groupId)
{
var group = await GetGroupAsync(groupId);
return group.UserIds.Select(GetUserAsync);
}
foreach (var task in await GetUserTasksAsync(1))
{
var user = await task;
...
}