异步等待和并行
Async await and parallel
我对 async/await 如何并行工作感到有点困惑,所以我在这里做了一个测试代码:
我尝试发送 6 个我用列表模拟的任务。
此任务中的每一个都将执行其他 3 个子任务。
你可以copy/paste进行测试
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
//job simulation
Func<int, string, Tuple<int, string>> tc = Tuple.Create;
var input = new List<Tuple<int, string>>{
tc( 6000, "task 1" ),
tc( 5000, "task 2" ),
tc( 1000, "task 3" ),
tc( 1000, "task 4" ),
tc( 1000, "task 5" ),
tc( 1000, "task 6" )
};
List<Tuple<int, string>> JobsList = new List<Tuple<int, string>>(input);
//paralelism atempt
List<Task> TaskLauncher = new List<Task>();
Parallel.ForEach<Tuple<int, string>>(JobsList, item => JobDispatcher(item.Item1, item.Item2));
Console.ReadLine();
}
public static async Task JobDispatcher(int time , string query)
{
List<Task> TList = new List<Task>();
Task<string> T1 = SubTask1(time, query);
Task<string> T2 = SubTask2(time, query);
Task<string> T3 = SubTask3(time, query);
TList.Add(T1);
TList.Add(T2);
TList.Add(T3);
Console.WriteLine("{0} Launched ", query);
await Task.WhenAll(TList.ToArray());
Console.WriteLine(T1.Result);
Console.WriteLine(T2.Result);
Console.WriteLine(T3.Result);
}
public static async Task<string> SubTask1(int time, string query)
{
//somework
Thread.Sleep(time);
return query + "Finshed SubTask1";
}
public static async Task<string> SubTask2(int time, string query)
{
//somework
Thread.Sleep(time);
return query + "Finshed SubTask2";
}
public static async Task<string> SubTask3(int time, string query)
{
//somework
Thread.Sleep(time);
return query + "Finshed SubTask3";
}
}
}
理想情况下,我应该在发布时阅读:
task 1 launched
task 2 launched
task 3 launched
task 4 launched
task 5 launched
task 6 launched
然后此时所有任务运行 6*3 = 18 个线程同时运行
但它不是这里发生的事情似乎是同步执行的。
结果如下:
用 async/await 编写可以作为 18 个并行线程启动任务和子任务的东西的正确方法是什么?
试试这个示例代码。请注意,它在大约 6 秒内完成,这表明所有任务都是 运行 异步的:
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main()
{
// ThreadPool throttling may cause the speed with which
// the threads are launched to be throttled.
// You can avoid that by uncommenting the following line,
// but that is considered bad form:
// ThreadPool.SetMinThreads(20, 20);
var sw = Stopwatch.StartNew();
Console.WriteLine("Waiting for all tasks to complete");
RunWorkers().Wait();
Console.WriteLine("All tasks completed in " + sw.Elapsed);
}
public static async Task RunWorkers()
{
await Task.WhenAll(
JobDispatcher(6000, "task 1"),
JobDispatcher(5000, "task 2"),
JobDispatcher(4000, "task 3"),
JobDispatcher(3000, "task 4"),
JobDispatcher(2000, "task 5"),
JobDispatcher(1000, "task 6")
);
}
public static async Task JobDispatcher(int time, string query)
{
var results = await Task.WhenAll(
worker(time, query + ": Subtask 1"),
worker(time, query + ": Subtask 2"),
worker(time, query + ": Subtask 3")
);
Console.WriteLine(string.Join("\n", results));
}
static async Task<string> worker(int time, string query)
{
return await Task.Run(() =>
{
Console.WriteLine("Starting worker " + query);
Thread.Sleep(time);
Console.WriteLine("Completed worker " + query);
return query + ": " + time + ", thread id: " + Thread.CurrentThread.ManagedThreadId;
});
}
}
}
下面是如何使用任务数组,在 RunWorkers()
:
public static async Task RunWorkers()
{
Task[] tasks = new Task[6];
for (int i = 0; i < 6; ++i)
tasks[i] = JobDispatcher(1000 + i*1000, "task " + i);
await Task.WhenAll(tasks);
}
我对 async/await 如何并行工作感到有点困惑,所以我在这里做了一个测试代码: 我尝试发送 6 个我用列表模拟的任务。 此任务中的每一个都将执行其他 3 个子任务。
你可以copy/paste进行测试
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
//job simulation
Func<int, string, Tuple<int, string>> tc = Tuple.Create;
var input = new List<Tuple<int, string>>{
tc( 6000, "task 1" ),
tc( 5000, "task 2" ),
tc( 1000, "task 3" ),
tc( 1000, "task 4" ),
tc( 1000, "task 5" ),
tc( 1000, "task 6" )
};
List<Tuple<int, string>> JobsList = new List<Tuple<int, string>>(input);
//paralelism atempt
List<Task> TaskLauncher = new List<Task>();
Parallel.ForEach<Tuple<int, string>>(JobsList, item => JobDispatcher(item.Item1, item.Item2));
Console.ReadLine();
}
public static async Task JobDispatcher(int time , string query)
{
List<Task> TList = new List<Task>();
Task<string> T1 = SubTask1(time, query);
Task<string> T2 = SubTask2(time, query);
Task<string> T3 = SubTask3(time, query);
TList.Add(T1);
TList.Add(T2);
TList.Add(T3);
Console.WriteLine("{0} Launched ", query);
await Task.WhenAll(TList.ToArray());
Console.WriteLine(T1.Result);
Console.WriteLine(T2.Result);
Console.WriteLine(T3.Result);
}
public static async Task<string> SubTask1(int time, string query)
{
//somework
Thread.Sleep(time);
return query + "Finshed SubTask1";
}
public static async Task<string> SubTask2(int time, string query)
{
//somework
Thread.Sleep(time);
return query + "Finshed SubTask2";
}
public static async Task<string> SubTask3(int time, string query)
{
//somework
Thread.Sleep(time);
return query + "Finshed SubTask3";
}
}
}
理想情况下,我应该在发布时阅读:
task 1 launched
task 2 launched
task 3 launched
task 4 launched
task 5 launched
task 6 launched
然后此时所有任务运行 6*3 = 18 个线程同时运行 但它不是这里发生的事情似乎是同步执行的。
结果如下:
用 async/await 编写可以作为 18 个并行线程启动任务和子任务的东西的正确方法是什么?
试试这个示例代码。请注意,它在大约 6 秒内完成,这表明所有任务都是 运行 异步的:
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main()
{
// ThreadPool throttling may cause the speed with which
// the threads are launched to be throttled.
// You can avoid that by uncommenting the following line,
// but that is considered bad form:
// ThreadPool.SetMinThreads(20, 20);
var sw = Stopwatch.StartNew();
Console.WriteLine("Waiting for all tasks to complete");
RunWorkers().Wait();
Console.WriteLine("All tasks completed in " + sw.Elapsed);
}
public static async Task RunWorkers()
{
await Task.WhenAll(
JobDispatcher(6000, "task 1"),
JobDispatcher(5000, "task 2"),
JobDispatcher(4000, "task 3"),
JobDispatcher(3000, "task 4"),
JobDispatcher(2000, "task 5"),
JobDispatcher(1000, "task 6")
);
}
public static async Task JobDispatcher(int time, string query)
{
var results = await Task.WhenAll(
worker(time, query + ": Subtask 1"),
worker(time, query + ": Subtask 2"),
worker(time, query + ": Subtask 3")
);
Console.WriteLine(string.Join("\n", results));
}
static async Task<string> worker(int time, string query)
{
return await Task.Run(() =>
{
Console.WriteLine("Starting worker " + query);
Thread.Sleep(time);
Console.WriteLine("Completed worker " + query);
return query + ": " + time + ", thread id: " + Thread.CurrentThread.ManagedThreadId;
});
}
}
}
下面是如何使用任务数组,在 RunWorkers()
:
public static async Task RunWorkers()
{
Task[] tasks = new Task[6];
for (int i = 0; i < 6; ++i)
tasks[i] = JobDispatcher(1000 + i*1000, "task " + i);
await Task.WhenAll(tasks);
}