使用 TPL batch/de-parallelise 分离调用
Using TPL to batch/de-parallelise separate invocations
也许 TPL 不是正确的工具,但至少从一个不是特别熟悉它的人来看,它似乎应该有我正在寻找的东西。我愿意接受不使用它的答案。
给出这样的方法:
public Task Submit(IEnumerable<WorkItem> work)
这可能会对项目集合执行昂贵的异步操作。通常调用者将这些项目分批并一次提交尽可能多的项目,并且在这些批之间有相当长的延迟,因此它执行得相当高效。
然而,在某些情况下,不会发生外部批处理,Submit
会快速连续多次调用少量项目(通常只有一个),甚至可能从不同的线程同时调用。
我想做的是推迟处理(同时累积参数),直到有一定的时间没有调用,然后然后执行操作整批,按照最初指定的顺序。
或者换句话说,每次调用该方法时,它都应该将其参数添加到待处理项目列表中,然后从零重新开始延迟,这样在处理任何事情之前都需要一定的空闲时间。
我不想要批次的大小限制(所以我认为 BatchBlock 不是正确的答案),我只想要一个 delay/timeout。我确定调用模式 会 在某个时候成为空闲期。
我不确定是否推迟第一次调用更好,或者是否应该立即开始操作并且仅在操作仍在进行时推迟后续调用。
如果它使问题更容易,我可以让 Submit
return 无效而不是 Task
(即无法观察它何时完成)。
我确定我可以把这样的东西混在一起,但它似乎应该已经存在于某个地方。谁能指出我正确的方向? (不过,我不想使用非核心库。)
好的,由于找不到合适的东西,我最终自己实现了一些东西。似乎可以解决问题。 (我在实际代码中实现的比此处显示的更通用一些,因此我可以更轻松地重用它,但这说明了这个概念。)
private readonly ConcurrentQueue<WorkItem> _Items
= new ConcurrentQueue<WorkItem>();
private CancellationTokenSource _CancelSource;
public async Task Submit(IEnumerable<WorkItem> items)
{
var cancel = ReplacePreviousTasks();
foreach (var item in items)
{
_Items.Enqueue(item);
}
await Task.Delay(TimeSpan.FromMilliseconds(250), cancel.Token);
if (!cancel.IsCancellationRequested)
{
await RunOperation();
}
}
private CancellationTokenSource ReplacePreviousTasks()
{
var cancel = new CancellationTokenSource();
var old = Interlocked.Exchange(ref _CancelSource, cancel);
if (old != null)
{
old.Cancel();
}
return cancel;
}
private async Task RunOperation()
{
var items = new List<WorkItem>();
WorkItem item;
while (_Items.TryDequeue(out item))
{
items.Add(item);
}
// do the operation on items
}
如果250ms内多次提交,则取消较早的提交,250ms后对所有item执行一次操作(从最后一次提交开始计算)。
如果在操作 运行ning 时发生另一个提交,它将继续 运行 而不会取消(它有很小的机会会从稍后的调用中窃取一些项目,但那是好的)。
(技术上检查 cancel.IsCancellationRequested
并不是真正必要的,因为上面的 await
如果在延迟期间被取消,将会抛出异常。但这并没有什么坏处,而且有一个微小的 window 它可能会捕捉到。)
也许 TPL 不是正确的工具,但至少从一个不是特别熟悉它的人来看,它似乎应该有我正在寻找的东西。我愿意接受不使用它的答案。
给出这样的方法:
public Task Submit(IEnumerable<WorkItem> work)
这可能会对项目集合执行昂贵的异步操作。通常调用者将这些项目分批并一次提交尽可能多的项目,并且在这些批之间有相当长的延迟,因此它执行得相当高效。
然而,在某些情况下,不会发生外部批处理,Submit
会快速连续多次调用少量项目(通常只有一个),甚至可能从不同的线程同时调用。
我想做的是推迟处理(同时累积参数),直到有一定的时间没有调用,然后然后执行操作整批,按照最初指定的顺序。
或者换句话说,每次调用该方法时,它都应该将其参数添加到待处理项目列表中,然后从零重新开始延迟,这样在处理任何事情之前都需要一定的空闲时间。
我不想要批次的大小限制(所以我认为 BatchBlock 不是正确的答案),我只想要一个 delay/timeout。我确定调用模式 会 在某个时候成为空闲期。
我不确定是否推迟第一次调用更好,或者是否应该立即开始操作并且仅在操作仍在进行时推迟后续调用。
如果它使问题更容易,我可以让 Submit
return 无效而不是 Task
(即无法观察它何时完成)。
我确定我可以把这样的东西混在一起,但它似乎应该已经存在于某个地方。谁能指出我正确的方向? (不过,我不想使用非核心库。)
好的,由于找不到合适的东西,我最终自己实现了一些东西。似乎可以解决问题。 (我在实际代码中实现的比此处显示的更通用一些,因此我可以更轻松地重用它,但这说明了这个概念。)
private readonly ConcurrentQueue<WorkItem> _Items
= new ConcurrentQueue<WorkItem>();
private CancellationTokenSource _CancelSource;
public async Task Submit(IEnumerable<WorkItem> items)
{
var cancel = ReplacePreviousTasks();
foreach (var item in items)
{
_Items.Enqueue(item);
}
await Task.Delay(TimeSpan.FromMilliseconds(250), cancel.Token);
if (!cancel.IsCancellationRequested)
{
await RunOperation();
}
}
private CancellationTokenSource ReplacePreviousTasks()
{
var cancel = new CancellationTokenSource();
var old = Interlocked.Exchange(ref _CancelSource, cancel);
if (old != null)
{
old.Cancel();
}
return cancel;
}
private async Task RunOperation()
{
var items = new List<WorkItem>();
WorkItem item;
while (_Items.TryDequeue(out item))
{
items.Add(item);
}
// do the operation on items
}
如果250ms内多次提交,则取消较早的提交,250ms后对所有item执行一次操作(从最后一次提交开始计算)。
如果在操作 运行ning 时发生另一个提交,它将继续 运行 而不会取消(它有很小的机会会从稍后的调用中窃取一些项目,但那是好的)。
(技术上检查 cancel.IsCancellationRequested
并不是真正必要的,因为上面的 await
如果在延迟期间被取消,将会抛出异常。但这并没有什么坏处,而且有一个微小的 window 它可能会捕捉到。)