运行 多个任务半并行
Run multiple tasks in semiparallel
我有一个任务列表,其中每个任务都有两个子任务。我只想在上一个任务完成第一个子任务后开始下一个任务。 (第一个子任务对单个 http api 调用了大约 1000 次,如果我称之为“太多”,它似乎会下降 if/throttles,第二个任务相当长-运行 cpu-绑定任务).
我的想法类似于以下示例代码(在实际情况下任务列表增长到至少 300 个):
class Program
{
static async Task Main(string[] args)
{
var test = new TestMultiTask();
await test.TestListOfTasks();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
public class TestMultiTask
{
public List<Task> Tasks { get; private set; }
public async Task TestListOfTasks()
{
Tasks = new List<Task>();
Tasks.Add(Task2SubTask(new Progress<MyProgress>(ReportProgress), 0));
await Task.WhenAll(Tasks);
Console.WriteLine("Finished TestListOfTasks");
}
private void ReportProgress(MyProgress obj)
{
if (obj.ProgressNo < 10)
{
Tasks.Add(Task2SubTask(new Progress<MyProgress>(ReportProgress), obj.ProgressNo + 1));
}
}
private async Task Task2SubTask(IProgress<MyProgress> progress, int i)
{
Console.WriteLine($"Task {i}, started.");
await Task.Delay(700); // simulating getting data from web api (abt- 1000 calls)
Console.WriteLine($"Task {i}, completed subtask 1.");
progress.Report(new MyProgress() { Task1Done = true, ProgressNo = i }); // now the next task can start getting data form web api.
await Task.Run(() => Task.Delay(1000)); // Process the data collected (in this task) from the web api
Console.WriteLine($"Task {i} done."); // return processed data.
}
}
然而,这至少有一个缺陷 - Program.Main
在列表中的第一个任务完成后立即继续,即使列表已展开。
有谁知道这样做的更聪明的方法? (也许是一种完全不同的方法?)
编辑:
感谢您的评论.. 现在我尝试对代码进行评论以使其更加清晰。
首先感谢大家的评论。我尝试了@Evk 的建议并得到了以下代码...
public class TestMultiTask2
{
public List<Task> Tasks { get; private set; }
public async Task TestListOfTasks()
{
var workers = new List<Worker>();
var crunchingTasks = new List<Task>();
var n = 10;
for (int i = 0; i < n; i++)
{
var w = new Worker(i);
workers.Add(w);
await w.GetDataAsync(new Progress<MyProgress>(ReportProgress));
crunchingTasks.Add(w.CrunchData(new Progress<MyProgress>(ReportProgress)));
}
await Task.WhenAll(crunchingTasks);
Console.WriteLine($"Worker 0 data: {string.Join(", ", workers[0].Datas)}");
Console.WriteLine($"Worker {n-1} data: {string.Join(", ", workers[n-1].Datas)}");
Console.WriteLine("Finished TestListOfTasks");
}
private void ReportProgress(MyProgress obj)
{
if (obj.Task2Started)
{
Console.WriteLine($"Task no. {obj.TaskNo} has started chrunching data: {obj.Task2Started}");
}
else if(obj.Task2Done)
{
Console.WriteLine($"Task no. {obj.TaskNo} has finished chrunching data: {obj.Task2Done}");
}
else if (obj.Task1Done)
{
Console.WriteLine($"Task no. {obj.TaskNo} has finished getting data: {obj.Task1Done}");
}
}
}
public class MyProgress
{
public int TaskNo { get; set; }
public bool Task1Done { get; set; }
public bool Task2Done { get; internal set; }
public bool Task2Started { get; internal set; }
}
public class Worker
{
public int ID { get; set; }
public Worker(int id)
{
ID = id;
}
public List<int> Datas { get; set; } = new List<int>() { 0, 0, 0 };
public async Task GetDataAsync(IProgress<MyProgress> progress)
{
await Task.Delay(500);
Datas = new List<int>() { 1, 2, 3 };
progress.Report(new MyProgress() { TaskNo = ID, Task1Done = true });
}
public async Task CrunchData(IProgress<MyProgress> progress)
{
progress.Report(new MyProgress() { TaskNo = ID, Task2Started = true });
await Task.Run(async () => { await Task.Delay(5000); Datas.Reverse(); });
progress.Report(new MyProgress() { TaskNo = ID, Task2Done = true });
}
}
class Program
{
static async Task Main(string[] args)
{
var test = new TestMultiTask2();
await test.TestListOfTasks();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
我有一个任务列表,其中每个任务都有两个子任务。我只想在上一个任务完成第一个子任务后开始下一个任务。 (第一个子任务对单个 http api 调用了大约 1000 次,如果我称之为“太多”,它似乎会下降 if/throttles,第二个任务相当长-运行 cpu-绑定任务).
我的想法类似于以下示例代码(在实际情况下任务列表增长到至少 300 个):
class Program
{
static async Task Main(string[] args)
{
var test = new TestMultiTask();
await test.TestListOfTasks();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
public class TestMultiTask
{
public List<Task> Tasks { get; private set; }
public async Task TestListOfTasks()
{
Tasks = new List<Task>();
Tasks.Add(Task2SubTask(new Progress<MyProgress>(ReportProgress), 0));
await Task.WhenAll(Tasks);
Console.WriteLine("Finished TestListOfTasks");
}
private void ReportProgress(MyProgress obj)
{
if (obj.ProgressNo < 10)
{
Tasks.Add(Task2SubTask(new Progress<MyProgress>(ReportProgress), obj.ProgressNo + 1));
}
}
private async Task Task2SubTask(IProgress<MyProgress> progress, int i)
{
Console.WriteLine($"Task {i}, started.");
await Task.Delay(700); // simulating getting data from web api (abt- 1000 calls)
Console.WriteLine($"Task {i}, completed subtask 1.");
progress.Report(new MyProgress() { Task1Done = true, ProgressNo = i }); // now the next task can start getting data form web api.
await Task.Run(() => Task.Delay(1000)); // Process the data collected (in this task) from the web api
Console.WriteLine($"Task {i} done."); // return processed data.
}
}
然而,这至少有一个缺陷 - Program.Main
在列表中的第一个任务完成后立即继续,即使列表已展开。
有谁知道这样做的更聪明的方法? (也许是一种完全不同的方法?)
编辑: 感谢您的评论.. 现在我尝试对代码进行评论以使其更加清晰。
首先感谢大家的评论。我尝试了@Evk 的建议并得到了以下代码...
public class TestMultiTask2
{
public List<Task> Tasks { get; private set; }
public async Task TestListOfTasks()
{
var workers = new List<Worker>();
var crunchingTasks = new List<Task>();
var n = 10;
for (int i = 0; i < n; i++)
{
var w = new Worker(i);
workers.Add(w);
await w.GetDataAsync(new Progress<MyProgress>(ReportProgress));
crunchingTasks.Add(w.CrunchData(new Progress<MyProgress>(ReportProgress)));
}
await Task.WhenAll(crunchingTasks);
Console.WriteLine($"Worker 0 data: {string.Join(", ", workers[0].Datas)}");
Console.WriteLine($"Worker {n-1} data: {string.Join(", ", workers[n-1].Datas)}");
Console.WriteLine("Finished TestListOfTasks");
}
private void ReportProgress(MyProgress obj)
{
if (obj.Task2Started)
{
Console.WriteLine($"Task no. {obj.TaskNo} has started chrunching data: {obj.Task2Started}");
}
else if(obj.Task2Done)
{
Console.WriteLine($"Task no. {obj.TaskNo} has finished chrunching data: {obj.Task2Done}");
}
else if (obj.Task1Done)
{
Console.WriteLine($"Task no. {obj.TaskNo} has finished getting data: {obj.Task1Done}");
}
}
}
public class MyProgress
{
public int TaskNo { get; set; }
public bool Task1Done { get; set; }
public bool Task2Done { get; internal set; }
public bool Task2Started { get; internal set; }
}
public class Worker
{
public int ID { get; set; }
public Worker(int id)
{
ID = id;
}
public List<int> Datas { get; set; } = new List<int>() { 0, 0, 0 };
public async Task GetDataAsync(IProgress<MyProgress> progress)
{
await Task.Delay(500);
Datas = new List<int>() { 1, 2, 3 };
progress.Report(new MyProgress() { TaskNo = ID, Task1Done = true });
}
public async Task CrunchData(IProgress<MyProgress> progress)
{
progress.Report(new MyProgress() { TaskNo = ID, Task2Started = true });
await Task.Run(async () => { await Task.Delay(5000); Datas.Reverse(); });
progress.Report(new MyProgress() { TaskNo = ID, Task2Done = true });
}
}
class Program
{
static async Task Main(string[] args)
{
var test = new TestMultiTask2();
await test.TestListOfTasks();
Console.WriteLine("Finished");
Console.ReadLine();
}
}