.NET 如何报告并行工作负载的(集体)进度,其中每个 object/task 报告其各自的进度?
.NET How to report (collective) progress on parallel workloads where each object/task reports it's own individual progress?
我正在处理并行工作负载,其中每个对象或任务报告其各自的进度,我想报告整个任务的集体进度。
例如,假设我有 10 个工作对象,它们都报告各自的进度。它们包含必须完成的 0-100 "tasks"。
如果我们对每个 Work 对象进行线性迭代,我们可以很容易地报告我们的进度并看到类似这样的输出:
Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #1 of 10 is currently 2 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.
但是,当 运行 并行时,输出将如下所示:
Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #4 of 10 is currently 16 of 100 tasks completed.
Work item #7 of 10 is currently 4 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.
我要解决的问题是在并行循环中串联所有进度,以便向用户输出更类似于“1/1000”或“10/1000”,代表工作总量已完成,并随着工作的继续更新分子。
我希望有一个适合的解决方案或模式,无论 Async/Await 还是使用任务异步模式——我都在使用——我希望 . NET 框架,我还没有发现。
使用 TAP 中的这个简单(伪代码)示例:
Parallel.ForEach(WorkObject, wo =>
{
// Perhaps each WorkObject has a "ProgressChanged" delegate that fires progress notifications.
wo.ProgressChanged += delegate (int currentProgress, int totalProgress)
{
ReportProgress($"Work item #{wo.ID} of {WorkObject.Count} is currently {currentProgress} of {totalProgress} tasks completed.
};
// Or perhaps using IProgress<T> or Progress?
// wo.PerformWork(/*IProgress<T> or Progress<T>, etc.*/);
});
我们可以并行迭代,当每个线程完成一个工作单元时,进度 updates/notifications 就会进来。
我们如何有效地合并所有 WorkObjects 的进度,以便我们可以报告更统一的“1/1000”完成?
问题是每个 WorkObject 可能有不同数量的 "jobs" 需要完成,我们可能有不同数量的 WorkObject 需要工作。如果在收到每个进度通知时简单地连接所有 WorkObjects 的分子和分母(假设它们在每个工作单元完成后更新),那么在并行工作负载结束时,进度通知将反映类似“1000/100,000”的内容而不是“1000/1000”。
看来我们需要一种方法来跟踪当前进度 X 以及总进度 Y,以便为用户形成关于总进度状态(Y 中的 X 已完成)的连贯消息。
是否有现有的模型(在框架中或其他地方)可以做到这一点?
我目前的想法是创建一个数据结构来记录并行执行的每个线程的线程 ID,然后跟踪每个线程在该数据结构中的进度(作为 X/Y)值,最后作为每个线程发布进度更新,迭代数据结构以从每个线程求和 X/Y 以生成总计 "X/Y" 以显示给用户。
但开发人员每天都在面对这个问题,所以一定有其他方法吗?
以下是一种可能的方法。与我上面描述的类似,除了我将 "work" 外包给任务并从初始线程上下文中提取 ReportProgress。
首先,几个类。我正在使用 Random 来决定每个作业需要多长时间以及每个 WorkObject 中有多少个作业。作业通过紧密循环模拟高 CPU 负载。您将使用自己的对象(以及实际有用的工作)。
public class Job
{
private readonly TimeSpan timeForJobToTake;
public Job(TimeSpan timeForJobToTake)
{
this.timeForJobToTake = timeForJobToTake;
}
public void DoJob()
{
DateTime endTime = DateTime.UtcNow.Add(this.timeForJobToTake);
while (DateTime.UtcNow < endTime)
{
// emulate high CPU load during job
}
}
}
public class WorkObject
{
private readonly List<Job> jobs = new List<Job>();
public WorkObject(Random random)
{
int jobsToCreate = random.Next(1, 10);
for (int i = 0; i < jobsToCreate; i++)
{
Job job = new Job(TimeSpan.FromMilliseconds(random.Next(100, 200)));
this.jobs.Add(job);
}
}
public int JobCount => this.jobs.Count;
public void PerformWork()
{
foreach (Job job in this.jobs)
{
job.DoJob();
}
}
}
然后你可以做类似下面的事情(控制台应用程序,但代码可以在其他上下文中工作):
internal class Program
{
private static readonly object syncObj = new object();
private static int lastNumerator;
private static int numerator;
private static int denominator;
private static void ReportProgress()
{
int currentNumerator = numerator;
// Don't emit progress if nothing changed
if (currentNumerator == lastNumerator) return;
Console.WriteLine($"{currentNumerator} of {denominator}");
lastNumerator = currentNumerator;
}
private static void Main(string[] args)
{
MainAsync().Wait();
Console.ReadLine();
}
private static async Task MainAsync()
{
// Setup example objects
Random random = new Random();
List<WorkObject> workObjects = new List<WorkObject>();
int numberOfWorkObjects = random.Next(50, 100);
for (int i = 0; i < numberOfWorkObjects; i++)
{
WorkObject workObject = new WorkObject(random);
denominator += workObject.JobCount;
workObjects.Add(workObject);
}
// The CancellationTokenSource is used to immediately abort the progress reporting once the work is complete
CancellationTokenSource progressReportCancellationTokenSource = new CancellationTokenSource();
Task workTask = Task.Run(() =>
{
Parallel.ForEach(workObjects,
wo =>
{
wo.PerformWork();
lock (syncObj)
{
numerator += wo.JobCount;
}
});
progressReportCancellationTokenSource.Cancel();
});
while (!workTask.IsCompleted)
{
try
{
ReportProgress();
await Task.Delay(250, progressReportCancellationTokenSource.Token);
}
catch (TaskCanceledException)
{
break;
}
}
await workTask;
ReportProgress();
}
}
我最终创建了一个 class 来管理线程进度;这是我想出的:
// A Parallel Progress Manager is designed to collect progress information from multiple sources and provide a total sum of progress.
// For example, if 3 objects are going to perform some work in parallel, and the first object has 10 tasks, the second has 100, and the last has 1000,
// when executing in parallel, it isn't useful to have each task fire a ProgressChanged() event (or something similar), as it would result in the progress
// being returned something like 0/10, 1/10, 2/10, 0/100, 3/10, 1/100, 0/1000, etc. (As each thread executes independently.)
//
// Instead, this class aggregates progress and provides a total sum of progress: 0/1110, 1/1110, etc.
//
// NOTE: The intention of this class is to manage parallelized workloads across numerous jobs. For example, operating in parallel against 3 different objects
// that all report progress independently, such as Paralle.ForEach(IEnumerable<T>). This is not suggested for parallelized workloads of a single job, such as
// Parallel.For(i, 100)—in this case, it is recommended to update progress using Interlocked.Increment() or a lock() on a synchronization object as one would normally.
// Example:
//
// ParallelProgressManager ppm = new ParallelProgressManager();
//
// Parallel.ForEach(IEnumerable<T>, t =>
// {
// t.ProgressChanged += delegate (long current, long total, bool indeterminate, string message)
// {
// lock(ppm)
// {
// var x = ppm.SetGetProgress(t.GetHashCode(), current, total);
//
// ReportProgress(x.Item1, x.Item2, false, $"Working... {x.Item1} / {x.Item2}");
// }
// }
// });
using System;
using System.Collections.Generic;
namespace Threading
{
/// <summary>
/// A Parallel Progress Manager used to aggregate and sum progress across multiple objects working in parallel.
/// </summary>
public class ParallelProgressManager
{
/// <summary>
/// The progress class contains current and total progress and
/// </summary>
protected class Progress
{
public long Current { get; set; } = 0;
public long Total { get; set; } = 0;
}
/// <summary>
/// The ProgressDictionary associates each working object's Hash Code with it's current progress (via a Progress object.)
/// This way an object can operate in parallel and as progress updates come in, the last update is replaced by the new one.
/// We can then sum the "current" and "total" to produce an overall progress value.
/// </summary>
private Dictionary<int, Progress> ProgressDictionary { get; set; } = new Dictionary<int, Progress>();
/// <summary>
/// Sets an object's progress via it's Hash Code. If the object isn't recognized, a new entry will be made for it. If it is recognized,
/// it's progress will be updated accordingly.
/// </summary>
/// <param name="hashCode">
/// The Hash Code of the object (.GetHashCode()) that's reporting progress. The Hash Code is used to distinguish the objects to manage progress of.
/// </param>
/// <param name="current">
/// The current progress.
/// </param>
/// <param name="total">
/// The total progress.
/// </param>
public void SetProgress(int hashCode, long current, long total)
{
if (!ProgressDictionary.ContainsKey(hashCode))
ProgressDictionary.Add(hashCode, new Progress() { Current = current, Total = total });
else
{
ProgressDictionary[hashCode].Current = current;
ProgressDictionary[hashCode].Total = total;
}
}
/// <summary>
/// Retrieves the total progress of all objects currently being managed.
/// </summary>
/// <returns>
/// A Tuple where the first value represents the summed current progress, and the second value represents the summed total progress.
/// </returns>
public Tuple<long, long> GetProgress()
{
long c = 0;
long t = 0;
foreach (var p in ProgressDictionary)
{
c += p.Value.Current;
t += p.Value.Total;
}
return Tuple.Create(c, t);
}
/// <summary>
/// Sets progress for the provided object and retrieves an updated total progress. This is equivalent to calling SetProgress() and then calling
/// GetProgress() immediately after.
/// </summary>
/// <param name="hashCode"></param>
/// <param name="currentStep"></param>
/// <param name="totalSteps"></param>
/// <returns></returns>
public Tuple<long, long> SetGetProgress(int hashCode, long currentStep, long totalSteps)
{
SetProgress(hashCode, currentStep, totalSteps);
return GetProgress();
}
}
}
我正在处理并行工作负载,其中每个对象或任务报告其各自的进度,我想报告整个任务的集体进度。
例如,假设我有 10 个工作对象,它们都报告各自的进度。它们包含必须完成的 0-100 "tasks"。
如果我们对每个 Work 对象进行线性迭代,我们可以很容易地报告我们的进度并看到类似这样的输出:
Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #1 of 10 is currently 2 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.
但是,当 运行 并行时,输出将如下所示:
Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #4 of 10 is currently 16 of 100 tasks completed.
Work item #7 of 10 is currently 4 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.
我要解决的问题是在并行循环中串联所有进度,以便向用户输出更类似于“1/1000”或“10/1000”,代表工作总量已完成,并随着工作的继续更新分子。
我希望有一个适合的解决方案或模式,无论 Async/Await 还是使用任务异步模式——我都在使用——我希望 . NET 框架,我还没有发现。
使用 TAP 中的这个简单(伪代码)示例:
Parallel.ForEach(WorkObject, wo =>
{
// Perhaps each WorkObject has a "ProgressChanged" delegate that fires progress notifications.
wo.ProgressChanged += delegate (int currentProgress, int totalProgress)
{
ReportProgress($"Work item #{wo.ID} of {WorkObject.Count} is currently {currentProgress} of {totalProgress} tasks completed.
};
// Or perhaps using IProgress<T> or Progress?
// wo.PerformWork(/*IProgress<T> or Progress<T>, etc.*/);
});
我们可以并行迭代,当每个线程完成一个工作单元时,进度 updates/notifications 就会进来。
我们如何有效地合并所有 WorkObjects 的进度,以便我们可以报告更统一的“1/1000”完成?
问题是每个 WorkObject 可能有不同数量的 "jobs" 需要完成,我们可能有不同数量的 WorkObject 需要工作。如果在收到每个进度通知时简单地连接所有 WorkObjects 的分子和分母(假设它们在每个工作单元完成后更新),那么在并行工作负载结束时,进度通知将反映类似“1000/100,000”的内容而不是“1000/1000”。
看来我们需要一种方法来跟踪当前进度 X 以及总进度 Y,以便为用户形成关于总进度状态(Y 中的 X 已完成)的连贯消息。
是否有现有的模型(在框架中或其他地方)可以做到这一点?
我目前的想法是创建一个数据结构来记录并行执行的每个线程的线程 ID,然后跟踪每个线程在该数据结构中的进度(作为 X/Y)值,最后作为每个线程发布进度更新,迭代数据结构以从每个线程求和 X/Y 以生成总计 "X/Y" 以显示给用户。
但开发人员每天都在面对这个问题,所以一定有其他方法吗?
以下是一种可能的方法。与我上面描述的类似,除了我将 "work" 外包给任务并从初始线程上下文中提取 ReportProgress。
首先,几个类。我正在使用 Random 来决定每个作业需要多长时间以及每个 WorkObject 中有多少个作业。作业通过紧密循环模拟高 CPU 负载。您将使用自己的对象(以及实际有用的工作)。
public class Job
{
private readonly TimeSpan timeForJobToTake;
public Job(TimeSpan timeForJobToTake)
{
this.timeForJobToTake = timeForJobToTake;
}
public void DoJob()
{
DateTime endTime = DateTime.UtcNow.Add(this.timeForJobToTake);
while (DateTime.UtcNow < endTime)
{
// emulate high CPU load during job
}
}
}
public class WorkObject
{
private readonly List<Job> jobs = new List<Job>();
public WorkObject(Random random)
{
int jobsToCreate = random.Next(1, 10);
for (int i = 0; i < jobsToCreate; i++)
{
Job job = new Job(TimeSpan.FromMilliseconds(random.Next(100, 200)));
this.jobs.Add(job);
}
}
public int JobCount => this.jobs.Count;
public void PerformWork()
{
foreach (Job job in this.jobs)
{
job.DoJob();
}
}
}
然后你可以做类似下面的事情(控制台应用程序,但代码可以在其他上下文中工作):
internal class Program
{
private static readonly object syncObj = new object();
private static int lastNumerator;
private static int numerator;
private static int denominator;
private static void ReportProgress()
{
int currentNumerator = numerator;
// Don't emit progress if nothing changed
if (currentNumerator == lastNumerator) return;
Console.WriteLine($"{currentNumerator} of {denominator}");
lastNumerator = currentNumerator;
}
private static void Main(string[] args)
{
MainAsync().Wait();
Console.ReadLine();
}
private static async Task MainAsync()
{
// Setup example objects
Random random = new Random();
List<WorkObject> workObjects = new List<WorkObject>();
int numberOfWorkObjects = random.Next(50, 100);
for (int i = 0; i < numberOfWorkObjects; i++)
{
WorkObject workObject = new WorkObject(random);
denominator += workObject.JobCount;
workObjects.Add(workObject);
}
// The CancellationTokenSource is used to immediately abort the progress reporting once the work is complete
CancellationTokenSource progressReportCancellationTokenSource = new CancellationTokenSource();
Task workTask = Task.Run(() =>
{
Parallel.ForEach(workObjects,
wo =>
{
wo.PerformWork();
lock (syncObj)
{
numerator += wo.JobCount;
}
});
progressReportCancellationTokenSource.Cancel();
});
while (!workTask.IsCompleted)
{
try
{
ReportProgress();
await Task.Delay(250, progressReportCancellationTokenSource.Token);
}
catch (TaskCanceledException)
{
break;
}
}
await workTask;
ReportProgress();
}
}
我最终创建了一个 class 来管理线程进度;这是我想出的:
// A Parallel Progress Manager is designed to collect progress information from multiple sources and provide a total sum of progress.
// For example, if 3 objects are going to perform some work in parallel, and the first object has 10 tasks, the second has 100, and the last has 1000,
// when executing in parallel, it isn't useful to have each task fire a ProgressChanged() event (or something similar), as it would result in the progress
// being returned something like 0/10, 1/10, 2/10, 0/100, 3/10, 1/100, 0/1000, etc. (As each thread executes independently.)
//
// Instead, this class aggregates progress and provides a total sum of progress: 0/1110, 1/1110, etc.
//
// NOTE: The intention of this class is to manage parallelized workloads across numerous jobs. For example, operating in parallel against 3 different objects
// that all report progress independently, such as Paralle.ForEach(IEnumerable<T>). This is not suggested for parallelized workloads of a single job, such as
// Parallel.For(i, 100)—in this case, it is recommended to update progress using Interlocked.Increment() or a lock() on a synchronization object as one would normally.
// Example:
//
// ParallelProgressManager ppm = new ParallelProgressManager();
//
// Parallel.ForEach(IEnumerable<T>, t =>
// {
// t.ProgressChanged += delegate (long current, long total, bool indeterminate, string message)
// {
// lock(ppm)
// {
// var x = ppm.SetGetProgress(t.GetHashCode(), current, total);
//
// ReportProgress(x.Item1, x.Item2, false, $"Working... {x.Item1} / {x.Item2}");
// }
// }
// });
using System;
using System.Collections.Generic;
namespace Threading
{
/// <summary>
/// A Parallel Progress Manager used to aggregate and sum progress across multiple objects working in parallel.
/// </summary>
public class ParallelProgressManager
{
/// <summary>
/// The progress class contains current and total progress and
/// </summary>
protected class Progress
{
public long Current { get; set; } = 0;
public long Total { get; set; } = 0;
}
/// <summary>
/// The ProgressDictionary associates each working object's Hash Code with it's current progress (via a Progress object.)
/// This way an object can operate in parallel and as progress updates come in, the last update is replaced by the new one.
/// We can then sum the "current" and "total" to produce an overall progress value.
/// </summary>
private Dictionary<int, Progress> ProgressDictionary { get; set; } = new Dictionary<int, Progress>();
/// <summary>
/// Sets an object's progress via it's Hash Code. If the object isn't recognized, a new entry will be made for it. If it is recognized,
/// it's progress will be updated accordingly.
/// </summary>
/// <param name="hashCode">
/// The Hash Code of the object (.GetHashCode()) that's reporting progress. The Hash Code is used to distinguish the objects to manage progress of.
/// </param>
/// <param name="current">
/// The current progress.
/// </param>
/// <param name="total">
/// The total progress.
/// </param>
public void SetProgress(int hashCode, long current, long total)
{
if (!ProgressDictionary.ContainsKey(hashCode))
ProgressDictionary.Add(hashCode, new Progress() { Current = current, Total = total });
else
{
ProgressDictionary[hashCode].Current = current;
ProgressDictionary[hashCode].Total = total;
}
}
/// <summary>
/// Retrieves the total progress of all objects currently being managed.
/// </summary>
/// <returns>
/// A Tuple where the first value represents the summed current progress, and the second value represents the summed total progress.
/// </returns>
public Tuple<long, long> GetProgress()
{
long c = 0;
long t = 0;
foreach (var p in ProgressDictionary)
{
c += p.Value.Current;
t += p.Value.Total;
}
return Tuple.Create(c, t);
}
/// <summary>
/// Sets progress for the provided object and retrieves an updated total progress. This is equivalent to calling SetProgress() and then calling
/// GetProgress() immediately after.
/// </summary>
/// <param name="hashCode"></param>
/// <param name="currentStep"></param>
/// <param name="totalSteps"></param>
/// <returns></returns>
public Tuple<long, long> SetGetProgress(int hashCode, long currentStep, long totalSteps)
{
SetProgress(hashCode, currentStep, totalSteps);
return GetProgress();
}
}
}