通过 IEnumerable 和 TPL 数据流流式传输数据
Streaming data via IEnumerable & TPL Dataflow
我正在从上游 API 获取项目,这非常慢。我尝试通过使用 TPL 数据流创建多个连接并将它们组合在一起来加快速度,就像这样;
class Stuff
{
int Id { get; }
}
async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();
async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
{
var bagOfStuff = new ConcurrentBag<Stuff>();
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
};
var processor = new ActionBlock<int>(async id =>
{
bagOfStuff.Add(await GetStuffById(id));
}, options);
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
await processor.Completion;
return bagOfStuff.ToArray();
}
问题是我必须等到我查询完 Stuff
的整个集合后才能 return 给调用者。我更喜欢的是,每当多个并行查询中的任何一个 return 是一个项目时,我都会以 yield return
的方式 return 该项目。因此我不需要 return 和 sync Task<IEnumerable<Stuff>>
,我可以 return 和 IEnumerable<Stuff>
并且调用者在任何项目 return 后立即推进迭代。
我试过这样做;
IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
};
var processor = new ActionBlock<int>(async id =>
{
yield return await GetStuffById(id);
}, options);
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
processor.Completion.Wait();
yield break;
}
但是我得到一个错误
The yield statement cannot be used inside an anonymous method or lambda expression
如何重组我的代码?
根据您的具体用例,您可能有几种不同的处理方法。但是要根据 TPL-Dataflow 处理项目,您需要将源块更改为 TransformBlock<,>
并将项目流到另一个块以处理您的项目。请注意,现在您可以摆脱收集 ConcurrentBag
并且如果您不关心您收到物品的顺序,请务必将 EnsureOrdered
设置为 false
。此外 link块并传播完成,以确保您的管道在检索到所有项目并随后进行处理后完成。
class Stuff
{
int Id { get; }
}
public class GetStuff
{
async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();
async Task GetLotsOfStuff(IEnumerable<int> ids)
{
//var bagOfStuff = new ConcurrentBag<Stuff>();
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
EnsureOrdered = false
};
var processor = new TransformBlock<int, Stuff>(id => GetStuffById(id), options);
var handler = new ActionBlock<Stuff>(s => throw new NotImplementedException());
processor.LinkTo(handler, new DataflowLinkOptions() { PropagateCompletion = true });
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
await handler.Completion;
}
}
其他选项可以使您的方法成为可观察的流出 TransformBlock
或使用 IAsyncEnumerable
到 yield return
和异步获取方法。
您可以 return 一个 IEnumerable
,但要这样做您必须阻塞当前线程。您需要一个 TransformBlock
来处理 id,以及一个将异步提供带有 id 的 TransformBlock
的馈送任务。最后当前线程会进入一个阻塞循环,等待produced stuff yield:
static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
using var completionCTS = new CancellationTokenSource();
var processor = new TransformBlock<int, Stuff>(async id =>
{
return await GetStuffById(id);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
BoundedCapacity = 50, // Avoid buffering millions of ids
CancellationToken = completionCTS.Token
});
var feederTask = Task.Run(async () =>
{
try
{
foreach (int id in ids)
if (!await processor.SendAsync(id)) break;
}
finally { processor.Complete(); }
});
try
{
while (processor.OutputAvailableAsync().Result)
while (processor.TryReceive(out var stuff))
yield return stuff;
}
finally // This runs when the caller exits the foreach loop
{
completionCTS.Cancel(); // Cancel the TransformBlock if it's still running
}
Task.WaitAll(feederTask, processor.Completion); // Propagate all exceptions
}
不需要ConcurrentBag
,因为TransformBlock
有一个内部输出缓冲区。棘手的部分是处理调用者将通过提前中断或被异常阻止而放弃 IEnumerable<Stuff>
的枚举的情况。在这种情况下,您不希望 feeder-task 一直使用 id 发送 IEnumerable<int>
直到结束。还好there is a solution。将 yielding 循环包含在 try/finally 块中允许接收此事件的通知,以便可以及时终止 feeder-task。
另一种实现可以通过在单个循环中组合抽取 id、提供块和生成内容来消除对 feeder-task 的需求。在这种情况下,您会希望泵送和产量之间存在滞后。要实现它,MoreLinq's Lag
(or Lead
) 扩展方法可能会很方便。
更新: 这是一个不同的实现,它在同一个循环中枚举和产生。为了实现所需的滞后,可枚举源右填充一些虚拟元素,数量与并发度相等。
此实现接受泛型,而不是 int
和 Stuff
。
public static IEnumerable<TResult> Transform<TSource, TResult>(
IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
int degreeOfConcurrency)
{
var processor = new TransformBlock<TSource, TResult>(async item =>
{
return await taskFactory(item);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = degreeOfConcurrency
});
var paddedSource = source.Select(item => (item, true))
.Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
int index = -1;
bool completed = false;
foreach (var (item, hasValue) in paddedSource)
{
index++;
if (hasValue) { processor.Post(item); }
else if (!completed) { processor.Complete(); completed = true; }
if (index >= degreeOfConcurrency)
{
if (!processor.OutputAvailableAsync().Result) break; // Blocking call
if (!processor.TryReceive(out var result))
throw new InvalidOperationException(); // Should never happen
yield return result;
}
}
processor.Completion.Wait();
}
用法示例:
IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);
两种实现都可以简单地修改为 return 和 IAsyncEnumerable
而不是 IEnumerable
,以避免阻塞调用线程。
我正在从上游 API 获取项目,这非常慢。我尝试通过使用 TPL 数据流创建多个连接并将它们组合在一起来加快速度,就像这样;
class Stuff
{
int Id { get; }
}
async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();
async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
{
var bagOfStuff = new ConcurrentBag<Stuff>();
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
};
var processor = new ActionBlock<int>(async id =>
{
bagOfStuff.Add(await GetStuffById(id));
}, options);
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
await processor.Completion;
return bagOfStuff.ToArray();
}
问题是我必须等到我查询完 Stuff
的整个集合后才能 return 给调用者。我更喜欢的是,每当多个并行查询中的任何一个 return 是一个项目时,我都会以 yield return
的方式 return 该项目。因此我不需要 return 和 sync Task<IEnumerable<Stuff>>
,我可以 return 和 IEnumerable<Stuff>
并且调用者在任何项目 return 后立即推进迭代。
我试过这样做;
IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
};
var processor = new ActionBlock<int>(async id =>
{
yield return await GetStuffById(id);
}, options);
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
processor.Completion.Wait();
yield break;
}
但是我得到一个错误
The yield statement cannot be used inside an anonymous method or lambda expression
如何重组我的代码?
根据您的具体用例,您可能有几种不同的处理方法。但是要根据 TPL-Dataflow 处理项目,您需要将源块更改为 TransformBlock<,>
并将项目流到另一个块以处理您的项目。请注意,现在您可以摆脱收集 ConcurrentBag
并且如果您不关心您收到物品的顺序,请务必将 EnsureOrdered
设置为 false
。此外 link块并传播完成,以确保您的管道在检索到所有项目并随后进行处理后完成。
class Stuff
{
int Id { get; }
}
public class GetStuff
{
async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();
async Task GetLotsOfStuff(IEnumerable<int> ids)
{
//var bagOfStuff = new ConcurrentBag<Stuff>();
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
EnsureOrdered = false
};
var processor = new TransformBlock<int, Stuff>(id => GetStuffById(id), options);
var handler = new ActionBlock<Stuff>(s => throw new NotImplementedException());
processor.LinkTo(handler, new DataflowLinkOptions() { PropagateCompletion = true });
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
await handler.Completion;
}
}
其他选项可以使您的方法成为可观察的流出 TransformBlock
或使用 IAsyncEnumerable
到 yield return
和异步获取方法。
您可以 return 一个 IEnumerable
,但要这样做您必须阻塞当前线程。您需要一个 TransformBlock
来处理 id,以及一个将异步提供带有 id 的 TransformBlock
的馈送任务。最后当前线程会进入一个阻塞循环,等待produced stuff yield:
static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
using var completionCTS = new CancellationTokenSource();
var processor = new TransformBlock<int, Stuff>(async id =>
{
return await GetStuffById(id);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
BoundedCapacity = 50, // Avoid buffering millions of ids
CancellationToken = completionCTS.Token
});
var feederTask = Task.Run(async () =>
{
try
{
foreach (int id in ids)
if (!await processor.SendAsync(id)) break;
}
finally { processor.Complete(); }
});
try
{
while (processor.OutputAvailableAsync().Result)
while (processor.TryReceive(out var stuff))
yield return stuff;
}
finally // This runs when the caller exits the foreach loop
{
completionCTS.Cancel(); // Cancel the TransformBlock if it's still running
}
Task.WaitAll(feederTask, processor.Completion); // Propagate all exceptions
}
不需要ConcurrentBag
,因为TransformBlock
有一个内部输出缓冲区。棘手的部分是处理调用者将通过提前中断或被异常阻止而放弃 IEnumerable<Stuff>
的枚举的情况。在这种情况下,您不希望 feeder-task 一直使用 id 发送 IEnumerable<int>
直到结束。还好there is a solution。将 yielding 循环包含在 try/finally 块中允许接收此事件的通知,以便可以及时终止 feeder-task。
另一种实现可以通过在单个循环中组合抽取 id、提供块和生成内容来消除对 feeder-task 的需求。在这种情况下,您会希望泵送和产量之间存在滞后。要实现它,MoreLinq's Lag
(or Lead
) 扩展方法可能会很方便。
更新: 这是一个不同的实现,它在同一个循环中枚举和产生。为了实现所需的滞后,可枚举源右填充一些虚拟元素,数量与并发度相等。
此实现接受泛型,而不是 int
和 Stuff
。
public static IEnumerable<TResult> Transform<TSource, TResult>(
IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
int degreeOfConcurrency)
{
var processor = new TransformBlock<TSource, TResult>(async item =>
{
return await taskFactory(item);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = degreeOfConcurrency
});
var paddedSource = source.Select(item => (item, true))
.Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
int index = -1;
bool completed = false;
foreach (var (item, hasValue) in paddedSource)
{
index++;
if (hasValue) { processor.Post(item); }
else if (!completed) { processor.Complete(); completed = true; }
if (index >= degreeOfConcurrency)
{
if (!processor.OutputAvailableAsync().Result) break; // Blocking call
if (!processor.TryReceive(out var result))
throw new InvalidOperationException(); // Should never happen
yield return result;
}
}
processor.Completion.Wait();
}
用法示例:
IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);
两种实现都可以简单地修改为 return 和 IAsyncEnumerable
而不是 IEnumerable
,以避免阻塞调用线程。