如何管理异步事件的线程阻塞和解除阻塞?
How to manage thread blocking and unblocking with asynchronous events?
背景
我目前正在使用可在 和 中运行的 GUI 重新创建某些功能,但是是通过终端界面。所以错误不在事件触发的另一端,因为它以原始 GUI 形式工作。
我运行一个任务由多台机器上的子任务组成。
我订阅了在取得进展时触发的事件,并打印出一条描述性消息。
将为所有 Y 机器的每个 X 子任务打印一条消息。
然后发生异步多线程操作。
我想为每台机器的每个子任务打印一条消息,只解决一次。
我跟踪子任务的完成情况,并保留一个二维布尔数组,其中行是机器,列是子任务。
问题
调试时我可以看到正在输入下面的事件处理程序方法。 numOfSubtasksFoundEventHandler 中的打印语句是 运行,但在我到达设置 AutoReset 事件之前,多个 BigTask 事件被触发,在 .WaitOne 处被阻止。
然而,尽管 numOfSubtasksFound.Set() 是 运行 之后的事实,但没有打印任何其他内容,程序也没有完成执行。没有什么能超越 numOfSubtasksFound.WaitOne 秒。
如果我在 BigTaskHandler 方法中取出 numOfSubtasksFound.WaitOne,我会收到类似的行为,但有几条消息表明 BigTask 已完成,然后程序在其他地方停止。
这里管理阻塞和解除阻塞的最佳方法是什么,或者有什么小的修复方法吗?
目标
我需要的是一种方法来阻止子任务事件处理程序方法的运行,直到 numOfSubtasksFoundEventHandler 有 运行 一次。我只需要 numOfSubTasksFoundEventHandler 到 运行 一次。
当前子任务事件处理程序未正确解锁。 numOfSubtasksFound.Set() 之后永远不会执行 switch case 代码;是 运行.
//MAIN
bool[] machinesDoneTasks = new bool[numOfMachines];
bool[][] machinesDoneSubtasks = new bool[numOfMachines][];
try
{
//thread/task blocking
numOfSubtasksFound = new AutoResetEvent(false);
AllSubTasksDone = new AutoResetEvent(false);
AllBigTasksDone = new AutoResetEvent(false);
//Subscribe to events to get number of subtasks and print useful information as tasks progress
numOfSubtasksFoundEvent += numOfSubtasksFoundEventHandler;
SubTaskProgEvent += SubTaskEventProgHandler; //prog stands for progress
BigTaskProgEvent += BigTaskProgEventHandler;
RunAllTasksOnAllMachines();//this will trigger the events above
//Don't exit program until those descriptive messages have been printed
numOfSubtasksFound.WaitOne();
AllSubTasksDone.WaitOne();
//SubTaskProgEvent -= SubTaskProgEventHandler;
AllBigTasksDone.WaitOne();
//BigTaskProgEvent -= BigTaskProgEventHandler;
}
catch (Exception e)
{
//print exceptions
}
//END MAIN
以下不一定是第一个触发的事件。
internal void numOfSubtasksFoundEventHandler(object sender, EventArgs e)
{
//get number of subtasks from args after checking for nulls, empty arrays
for (int i = 0; i < numOfSubtasks; i++)
machinesDoneSubtasks[i] = new bool[numOfSubtasks];
Console.WriteLine("number of subtasks found");
numOfSubtasksFoundEvent -= numOfSubtasksFoundEventHandler;//don't subscribe to event where we get this from anymore
if (numOfSubtasksFound != null)
numOfSubtasksFound.Set(); //stop blocking
}
子任务事件不一定在大任务事件之前得到处理。
internal void SubtaskEventProgHandler(object sender, EventArgs e)
{
//null, empty checks on args
//Wait until we know how many subtasks there are and the 2D boolean array is fully built
numOfSubtasksFound.WaitOne();
switch (e.WhatHappened)
{
Case.TaskComplete:
Console.Write(e.Machine + " is done subtask " + e.subTask);
//logic to determine machine and subtask
machinesDoneSubtasks[machine][Subtask] = true;
if (AllSubTasksDone != null && machinesDoneSubtasks.OfType<bool>().All(x => x))
AllSubTasksDone.Set(); //stop blocking when 2D array is all true
break;
//other cases, different prints, but same idea
}
}
BigTask 进度事件发生在处理的开始中间和结束。我只打印我想要的案例的详细信息。
internal void BigTaskProgEventHandler(object sender, EventArgs e)
{
//Wait until we know how many subtasks there are and the 2D boolean array is fully built before printing
numOfSubtasksFound.WaitOne();
//null, empty exception checks
switch (e.WhatHappened)
{
Case.TaskComplete:
Console.Write(e.Machine + " is done task " + e.subTask);
//logic to determine machine
machinesDoneTasks[machine] = true;
if (AllBigTasksDone != null && machinesDoneTasks.All(x => x))
AllBigTasksDone.Set();
break;
}
//other cases, different prints, but same idea
}
async/await 模型示例。
每台机器上的一些任务 运行 并计算一个值。
完成所有任务后,值将显示在控制台上。
static void Main(string[] args)
{
var service = new DispatchTasksOnMachinesService(8, 3);
service.DispatchTasks();
Console.Read();
}
class DispatchTasksOnMachinesService
{
int numOfMachines;
int tasksPerMachine;
[ThreadStatic]
private Random random = new Random();
public DispatchTasksOnMachinesService(int numOfMachines, int tasksPerMachine)
{
this.numOfMachines = numOfMachines;
this.tasksPerMachine = tasksPerMachine;
}
public async void DispatchTasks()
{
var tasks = new List<Task<Tuple<Guid, Machine, int>>>();
for (int i = 0; i < this.numOfMachines; i++)
{
var j = i;
for (int k = 0; k < this.tasksPerMachine; k++)
{
var task = Task.Run(() => Foo(Guid.NewGuid(), new Machine("machine" + j)));
tasks.Add(task);
}
}
var results = await Task.WhenAll<Tuple<Guid, Machine, int>>(tasks);
foreach (var result in results)
{
Console.WriteLine($"Task {result.Item1} on {result.Item2} yielded result {result.Item3}");
}
}
private Tuple<Guid, Machine, int> Foo(Guid taskId, Machine machine)
{
Thread.Sleep(TimeSpan.FromSeconds(random.Next(1,5)));
Console.WriteLine($"Task {taskId} has completed on {machine}");
return new Tuple<Guid, Machine, int>(taskId, machine, random.Next(500, 2000));
}
}
class Machine
{
public string Name { get; private set; }
public Machine(string name)
{
this.Name = name;
}
public override string ToString()
{
return this.Name;
}
}
我的问题是,在触发第一个事件后,其他子任务事件处理程序事件将调用 .WaitOne,这将阻塞。这可能在发现子任务的数量之后发生。问题是 .Set 只会被调用一次,而且永远不会被解除阻塞。
因此,当发现子任务的数量时使用布尔标志来设置,并且锁定子任务事件处理程序是可行的方法。
背景
我目前正在使用可在 和 中运行的 GUI 重新创建某些功能,但是是通过终端界面。所以错误不在事件触发的另一端,因为它以原始 GUI 形式工作。
我运行一个任务由多台机器上的子任务组成。
我订阅了在取得进展时触发的事件,并打印出一条描述性消息。 将为所有 Y 机器的每个 X 子任务打印一条消息。
然后发生异步多线程操作。
我想为每台机器的每个子任务打印一条消息,只解决一次。
我跟踪子任务的完成情况,并保留一个二维布尔数组,其中行是机器,列是子任务。
问题
调试时我可以看到正在输入下面的事件处理程序方法。 numOfSubtasksFoundEventHandler 中的打印语句是 运行,但在我到达设置 AutoReset 事件之前,多个 BigTask 事件被触发,在 .WaitOne 处被阻止。
然而,尽管 numOfSubtasksFound.Set() 是 运行 之后的事实,但没有打印任何其他内容,程序也没有完成执行。没有什么能超越 numOfSubtasksFound.WaitOne 秒。
如果我在 BigTaskHandler 方法中取出 numOfSubtasksFound.WaitOne,我会收到类似的行为,但有几条消息表明 BigTask 已完成,然后程序在其他地方停止。
这里管理阻塞和解除阻塞的最佳方法是什么,或者有什么小的修复方法吗?
目标
我需要的是一种方法来阻止子任务事件处理程序方法的运行,直到 numOfSubtasksFoundEventHandler 有 运行 一次。我只需要 numOfSubTasksFoundEventHandler 到 运行 一次。
当前子任务事件处理程序未正确解锁。 numOfSubtasksFound.Set() 之后永远不会执行 switch case 代码;是 运行.
//MAIN
bool[] machinesDoneTasks = new bool[numOfMachines];
bool[][] machinesDoneSubtasks = new bool[numOfMachines][];
try
{
//thread/task blocking
numOfSubtasksFound = new AutoResetEvent(false);
AllSubTasksDone = new AutoResetEvent(false);
AllBigTasksDone = new AutoResetEvent(false);
//Subscribe to events to get number of subtasks and print useful information as tasks progress
numOfSubtasksFoundEvent += numOfSubtasksFoundEventHandler;
SubTaskProgEvent += SubTaskEventProgHandler; //prog stands for progress
BigTaskProgEvent += BigTaskProgEventHandler;
RunAllTasksOnAllMachines();//this will trigger the events above
//Don't exit program until those descriptive messages have been printed
numOfSubtasksFound.WaitOne();
AllSubTasksDone.WaitOne();
//SubTaskProgEvent -= SubTaskProgEventHandler;
AllBigTasksDone.WaitOne();
//BigTaskProgEvent -= BigTaskProgEventHandler;
}
catch (Exception e)
{
//print exceptions
}
//END MAIN
以下不一定是第一个触发的事件。
internal void numOfSubtasksFoundEventHandler(object sender, EventArgs e)
{
//get number of subtasks from args after checking for nulls, empty arrays
for (int i = 0; i < numOfSubtasks; i++)
machinesDoneSubtasks[i] = new bool[numOfSubtasks];
Console.WriteLine("number of subtasks found");
numOfSubtasksFoundEvent -= numOfSubtasksFoundEventHandler;//don't subscribe to event where we get this from anymore
if (numOfSubtasksFound != null)
numOfSubtasksFound.Set(); //stop blocking
}
子任务事件不一定在大任务事件之前得到处理。
internal void SubtaskEventProgHandler(object sender, EventArgs e)
{
//null, empty checks on args
//Wait until we know how many subtasks there are and the 2D boolean array is fully built
numOfSubtasksFound.WaitOne();
switch (e.WhatHappened)
{
Case.TaskComplete:
Console.Write(e.Machine + " is done subtask " + e.subTask);
//logic to determine machine and subtask
machinesDoneSubtasks[machine][Subtask] = true;
if (AllSubTasksDone != null && machinesDoneSubtasks.OfType<bool>().All(x => x))
AllSubTasksDone.Set(); //stop blocking when 2D array is all true
break;
//other cases, different prints, but same idea
}
}
BigTask 进度事件发生在处理的开始中间和结束。我只打印我想要的案例的详细信息。
internal void BigTaskProgEventHandler(object sender, EventArgs e)
{
//Wait until we know how many subtasks there are and the 2D boolean array is fully built before printing
numOfSubtasksFound.WaitOne();
//null, empty exception checks
switch (e.WhatHappened)
{
Case.TaskComplete:
Console.Write(e.Machine + " is done task " + e.subTask);
//logic to determine machine
machinesDoneTasks[machine] = true;
if (AllBigTasksDone != null && machinesDoneTasks.All(x => x))
AllBigTasksDone.Set();
break;
}
//other cases, different prints, but same idea
}
async/await 模型示例。 每台机器上的一些任务 运行 并计算一个值。 完成所有任务后,值将显示在控制台上。
static void Main(string[] args)
{
var service = new DispatchTasksOnMachinesService(8, 3);
service.DispatchTasks();
Console.Read();
}
class DispatchTasksOnMachinesService
{
int numOfMachines;
int tasksPerMachine;
[ThreadStatic]
private Random random = new Random();
public DispatchTasksOnMachinesService(int numOfMachines, int tasksPerMachine)
{
this.numOfMachines = numOfMachines;
this.tasksPerMachine = tasksPerMachine;
}
public async void DispatchTasks()
{
var tasks = new List<Task<Tuple<Guid, Machine, int>>>();
for (int i = 0; i < this.numOfMachines; i++)
{
var j = i;
for (int k = 0; k < this.tasksPerMachine; k++)
{
var task = Task.Run(() => Foo(Guid.NewGuid(), new Machine("machine" + j)));
tasks.Add(task);
}
}
var results = await Task.WhenAll<Tuple<Guid, Machine, int>>(tasks);
foreach (var result in results)
{
Console.WriteLine($"Task {result.Item1} on {result.Item2} yielded result {result.Item3}");
}
}
private Tuple<Guid, Machine, int> Foo(Guid taskId, Machine machine)
{
Thread.Sleep(TimeSpan.FromSeconds(random.Next(1,5)));
Console.WriteLine($"Task {taskId} has completed on {machine}");
return new Tuple<Guid, Machine, int>(taskId, machine, random.Next(500, 2000));
}
}
class Machine
{
public string Name { get; private set; }
public Machine(string name)
{
this.Name = name;
}
public override string ToString()
{
return this.Name;
}
}
我的问题是,在触发第一个事件后,其他子任务事件处理程序事件将调用 .WaitOne,这将阻塞。这可能在发现子任务的数量之后发生。问题是 .Set 只会被调用一次,而且永远不会被解除阻塞。
因此,当发现子任务的数量时使用布尔标志来设置,并且锁定子任务事件处理程序是可行的方法。