运行 最大并发的多个操作 - 最后 2 个任务未执行
Running multiple operations with max concurrency - 2 last tasks are not executed
我创建了一个 class 允许我并发 运行 多个操作,并带有一个选项来设置最大并发限制。即,如果我有 100 个操作要做,并且我将 maxCurrency
设置为 10,那么在任何给定时间,最多 10 个操作应该同时 运行ning。最终,应该执行所有操作。
代码如下:
public async Task<IReadOnlyCollection<T>> Run<T>(IEnumerable<Func<CancellationToken, Task<T>>> operations, int maxConcurrency, CancellationToken ct)
{
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
var results = new ConcurrentBag<T>();
var tasks = new List<Task>();
foreach (var operation in operations)
{
await semaphore.WaitAsync(ct).ConfigureAwait(false);
var task = Task.Factory.StartNew(async () =>
{
try
{
Debug.WriteLine($"Adding new result");
var singleResult = await operation(ct).ConfigureAwait(false);
results.Add(singleResult);
Debug.WriteLine($"Added {singleResult}");
}
finally
{
semaphore.Release();
}
}, ct);
tasks.Add(task);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
Debug.WriteLine($"Completed tasks: {tasks.Count(t => t.IsCompleted)}");
Debug.WriteLine($"Calculated results: {results.Count}");
return results.ToList().AsReadOnly();
}
这是我如何使用它的示例:
var operations = Enumerable.Range(1, 10)
.Select<int, Func<CancellationToken, Task<int>>>(n => async ct =>
{
await Task.Delay(100, ct);
return n;
});
var data = await _sut.Run(operations, 2, CancellationToken.None);
每次执行此操作时,data
集合只有 8 个结果。我希望有 10 个结果。
这是调试日志:
Adding new
Adding new
Added 1
Added 2
Adding new
Adding new
Added 3
Added 4
Adding new
Adding new
Added 5
Adding new
Added 6
Adding new
Added 7
Adding new
Added 8
Adding new
Completed tasks: 10
Calculated results: 8
如您所见:
- 10 个任务已完成
- “Adding new”被记录 10 次
- “已添加 x”被记录 8 次
我不明白为什么最后2个操作没有完成。所有的任务都IsComplete
设置为true
,按我的理解应该是全部执行完了。
这里的问题是 Task.Factory.StartNew
returns 等待 returns 内部任务 .
的任务
它不会给你一个会等待这个内部任务的任务,因此你的问题。
解决此问题的最简单方法是对您创建的任务调用 Unwrap
,这将打开内部任务并让您等待它。
这应该有效:
var task = ....
....
}, ct).Unwrap();
通过这个小改动,您将获得以下输出:
...
Added 9
Added 10
Completed tasks: 10
Calculated results: 10
请注意,我对您的问题的评论仍然有效:
- 您仍然抱有 WhenAll 将等待所有任务的错觉,而实际上除了最后 N 个任务之外的所有任务都已经完成,因为循环本身在前面的任务完成之前不会继续。因此,您应该将同步对象获取移动到您的内部任务中,以便您可以在开始等待它们之前将它们全部排队。
我也相信(虽然我不是 100%知道)使用 SemaphoreSlim 不是一个好方法,因为我相信任何与线程相关的同步对象在与任务相关的工作中使用可能不安全。线程池中的线程在实时任务等待子任务完成时被重用,这意味着这样的线程可能已经拥有来自尚未完成的先前任务的同步对象,因此允许超过你想要的 2 个 运行 运行 在“同时”。 SemaphoreSlim 可以使用,其他同步原语可能不行。
我创建了一个 class 允许我并发 运行 多个操作,并带有一个选项来设置最大并发限制。即,如果我有 100 个操作要做,并且我将 maxCurrency
设置为 10,那么在任何给定时间,最多 10 个操作应该同时 运行ning。最终,应该执行所有操作。
代码如下:
public async Task<IReadOnlyCollection<T>> Run<T>(IEnumerable<Func<CancellationToken, Task<T>>> operations, int maxConcurrency, CancellationToken ct)
{
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
var results = new ConcurrentBag<T>();
var tasks = new List<Task>();
foreach (var operation in operations)
{
await semaphore.WaitAsync(ct).ConfigureAwait(false);
var task = Task.Factory.StartNew(async () =>
{
try
{
Debug.WriteLine($"Adding new result");
var singleResult = await operation(ct).ConfigureAwait(false);
results.Add(singleResult);
Debug.WriteLine($"Added {singleResult}");
}
finally
{
semaphore.Release();
}
}, ct);
tasks.Add(task);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
Debug.WriteLine($"Completed tasks: {tasks.Count(t => t.IsCompleted)}");
Debug.WriteLine($"Calculated results: {results.Count}");
return results.ToList().AsReadOnly();
}
这是我如何使用它的示例:
var operations = Enumerable.Range(1, 10)
.Select<int, Func<CancellationToken, Task<int>>>(n => async ct =>
{
await Task.Delay(100, ct);
return n;
});
var data = await _sut.Run(operations, 2, CancellationToken.None);
每次执行此操作时,data
集合只有 8 个结果。我希望有 10 个结果。
这是调试日志:
Adding new
Adding new
Added 1
Added 2
Adding new
Adding new
Added 3
Added 4
Adding new
Adding new
Added 5
Adding new
Added 6
Adding new
Added 7
Adding new
Added 8
Adding new
Completed tasks: 10
Calculated results: 8
如您所见:
- 10 个任务已完成
- “Adding new”被记录 10 次
- “已添加 x”被记录 8 次
我不明白为什么最后2个操作没有完成。所有的任务都IsComplete
设置为true
,按我的理解应该是全部执行完了。
这里的问题是 Task.Factory.StartNew
returns 等待 returns 内部任务 .
它不会给你一个会等待这个内部任务的任务,因此你的问题。
解决此问题的最简单方法是对您创建的任务调用 Unwrap
,这将打开内部任务并让您等待它。
这应该有效:
var task = ....
....
}, ct).Unwrap();
通过这个小改动,您将获得以下输出:
...
Added 9
Added 10
Completed tasks: 10
Calculated results: 10
请注意,我对您的问题的评论仍然有效:
- 您仍然抱有 WhenAll 将等待所有任务的错觉,而实际上除了最后 N 个任务之外的所有任务都已经完成,因为循环本身在前面的任务完成之前不会继续。因此,您应该将同步对象获取移动到您的内部任务中,以便您可以在开始等待它们之前将它们全部排队。
我也相信(虽然我不是 100%知道)使用 SemaphoreSlim 不是一个好方法,因为我相信任何与线程相关的同步对象在与任务相关的工作中使用可能不安全。线程池中的线程在实时任务等待子任务完成时被重用,这意味着这样的线程可能已经拥有来自尚未完成的先前任务的同步对象,因此允许超过你想要的 2 个 运行 运行 在“同时”。SemaphoreSlim 可以使用,其他同步原语可能不行。