使用 TPL batch/de-parallelise 分离调用

Using TPL to batch/de-parallelise separate invocations

也许 TPL 不是正确的工具,但至少从一个不是特别熟悉它的人来看,它似乎应该有我正在寻找的东西。我愿意接受不使用它的答案。

给出这样的方法:

public Task Submit(IEnumerable<WorkItem> work)

这可能会对项目集合执行昂贵的异步操作。通常调用者将这些项目分批并一次提交尽可能多的项目,并且在这些批之间有相当长的延迟,因此它执行得相当高效。

然而,在某些情况下,不会发生外部批处理,Submit 会快速连续多次调用少量项目(通常只有一个),甚至可能从不同的线程同时调用。

我想做的是推迟处理(同时累积参数),直到有一定的时间没有调用,然后然后执行操作整批,按照最初指定的顺序。

或者换句话说,每次调用该方法时,它都应该将其参数添加到待处理项目列表中,然后从零重新开始延迟,这样在处理任何事情之前都需要一定的空闲时间。

我不想要批次的大小限制(所以我认为 BatchBlock 不是正确的答案),我只想要一个 delay/timeout。我确定调用模式 在某个时候成为空闲期。

我不确定是否推迟第一次调用更好,或者是否应该立即开始操作并且仅在操作仍在进行时推迟后续调用。

如果它使问题更容易,我可以让 Submit return 无效而不是 Task(即无法观察它何时完成)。

我确定我可以把这样的东西混在一起,但它似乎应该已经存在于某个地方。谁能指出我正确的方向? (不过,我不想使用非核心库。)

好的,由于找不到合适的东西,我最终自己实现了一些东西。似乎可以解决问题。 (我在实际代码中实现的比此处显示的更通用一些,因此我可以更轻松地重用它,但这说明了这个概念。)

private readonly ConcurrentQueue<WorkItem> _Items
    = new ConcurrentQueue<WorkItem>();
private CancellationTokenSource _CancelSource;

public async Task Submit(IEnumerable<WorkItem> items)
{
    var cancel = ReplacePreviousTasks();

    foreach (var item in items)
    {
        _Items.Enqueue(item);
    }

    await Task.Delay(TimeSpan.FromMilliseconds(250), cancel.Token);
    if (!cancel.IsCancellationRequested)
    {
        await RunOperation();
    }
}

private CancellationTokenSource ReplacePreviousTasks()
{
    var cancel = new CancellationTokenSource();
    var old = Interlocked.Exchange(ref _CancelSource, cancel);
    if (old != null)
    {
        old.Cancel();
    }
    return cancel;
}

private async Task RunOperation()
{
    var items = new List<WorkItem>();
    WorkItem item;
    while (_Items.TryDequeue(out item))
    {
        items.Add(item);
    }

    // do the operation on items
}

如果250ms内多次提交,则取消较早的提交,250ms后对所有item执行一次操作(从最后一次提交开始计算)。

如果在操作 运行ning 时发生另一个提交,它将继续 运行 而不会取消(它有很小的机会会从稍后的调用中窃取一些项目,但那是好的)。

(技术上检查 cancel.IsCancellationRequested 并不是真正必要的,因为上面的 await 如果在延迟期间被取消,将会抛出异常。但这并没有什么坏处,而且有一个微小的 window 它可能会捕捉到。)