在 c# 中创建多线程应用程序以 运行 多个查询
Create a multi threaded applications to run multiple queries in c#
我正在尝试构建 Windows 异步查询的 Windows 表单工具。
该应用程序有一个数据网格视图,其中包含对 运行 的 30 个可能的查询。用户检查他想要执行的查询,比如说 10 个查询,然后点击一个按钮。
该应用程序有一个名为 maxthreads = 3 的变量(为了便于讨论),它指示可以使用多少线程来异步 运行 查询。生产环境中的查询 运行,我们不想同时使用太多线程 运行 使系统过载。每个查询 运行s 平均 30 秒。 (有的 5 分钟,有的 2 秒)
在 datagridview 中有一个包含图标的图像列,该图标描述了每个查询的状态(0-可用 运行,1-选择 运行ning,2- 运行,3 - 成功完成,-1 错误)
每次查询开始和结束时,我都需要能够与 UI 进行通信。查询完成后,结果将显示在 Tabcontrol 中包含的数据网格视图中(每个查询一个选项卡)
方法:我想创建一些 maxthread backgroundworkers 并让它们 运行 进行查询。当后台工作人员完成时,它会与 UI 通信并分配给新查询,依此类推,直到所有查询都已完成 运行。
我尝试使用 assignmentWorker 将工作分派给后台工作人员,但不知道如何等待所有线程完成。 bgw 完成后,它会向 assignmentWorker 报告 RunWorkerCompleted 事件的进度,但那个已经完成了。
在 UI 线程中,我用所有需要 运行:
的查询调用分配工作器
private void btnRunQueries_Click(object sender, EventArgs e)
{
if (AnyQueriesSelected())
{
tcResult.TabPages.Clear();
foreach (DataGridViewRow dgr in dgvQueries.Rows)
{
if (Convert.ToBoolean(dgr.Cells["chk"].Value))
{
Query q = new Query(dgr.Cells["ID"].Value.ToString(),
dgr.Cells["Name"].Value.ToString(),
dgr.Cells["FileName"].Value.ToString(),
dgr.Cells["ShortDescription"].Value.ToString(),
dgr.Cells["LongDescription"].Value.ToString(),
dgr.Cells["Level"].Value.ToString(),
dgr.Cells["Task"].Value.ToString(),
dgr.Cells["Importance"].Value.ToString(),
dgr.Cells["SkillSet"].Value.ToString(),
false,
new Dictionary<string, string>()
{ { "#ClntNb#", txtClntNum.Text }, { "#Staff#", "100300" } });
qryList.Add(q);
}
}
assignmentWorker.RunWorkerAsync(qryList);
}
else
{
MessageBox.Show("Please select at least one query.",
"Warning",
MessageBoxButtons.OK,
MessageBoxIcon.Information);
}
}
这是 AssignmentWorker:
private void assignmentWorker_DoWork(object sender, DoWorkEventArgs e)
{
foreach (Query q in (List<Query>)e.Argument)
{
while (!q.Processed)
{
for (int threadNum = 0; threadNum < maxThreads; threadNum++)
{
if (!threadArray[threadNum].IsBusy)
{
threadArray[threadNum].RunWorkerAsync(q);
q.Processed = true;
assignmentWorker.ReportProgress(1, q);
break;
}
}
//If all threads are being used, sleep awhile before checking again
if (!q.Processed)
{
Thread.Sleep(500);
}
}
}
}
所有bgw 运行同一事件:
private void backgroundWorkerFiles_DoWork(object sender, DoWorkEventArgs e)
{
try
{
Query qry = (Query)e.Argument;
DataTable dtNew = DataAccess.RunQuery(qry).dtResult;
if (dsQryResults.Tables.Contains(dtNew.TableName))
{
dsQryResults.Tables.Remove(dtNew.TableName);
}
dsQryResults.Tables.Add(dtNew);
e.Result = qry;
}
catch (Exception ex)
{
}
}
查询返回并且数据表已添加到数据集后:
private void backgroundWorkerFiles_RunWorkerCompleted(object sender,
RunWorkerCompletedEventArgs e)
{
try
{
if (e.Error != null)
{
assignmentWorker.ReportProgress(-1, e.Result);
}
else
{
assignmentWorker.ReportProgress(2, e.Result);
}
}
catch (Exception ex)
{
int o = 0;
}
}
我遇到的问题是分配工作人员在 bgw 完成之前完成,并且对 assignmentWorker.ReportProgress 的调用见鬼去吧(原谅我的法语)。
我如何才能等待所有启动的 bgw 完成才能完成分配工作人员?
谢谢!
如 中所述,您的设计过于复杂。如果您有特定的最大数量的应该同时执行的任务(查询),您可以并且应该简单地创建该数量的工作人员,并让他们从您的任务队列(或列表)中消耗任务,直到该队列为空。
由于缺乏能够简洁明了地说明您的具体情况的Minimal, Complete, and Verifiable code example,提供直接解决您的问题的代码是不可行的。但是,这里有一个使用 List<T>
作为原始代码的示例,它将像我上面描述的那样工作:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace TestSO42101517WaitAsyncTasks
{
class Program
{
static void Main(string[] args)
{
Random random = new Random();
int maxTasks = 30,
maxActive = 3,
maxDelayMs = 1000,
currentDelay = -1;
List<TimeSpan> taskDelays = new List<TimeSpan>(maxTasks);
for (int i = 0; i < maxTasks; i++)
{
taskDelays.Add(TimeSpan.FromMilliseconds(random.Next(maxDelayMs)));
}
Task[] tasks = new Task[maxActive];
object o = new object();
for (int i = 0; i < maxActive; i++)
{
int workerIndex = i;
tasks[i] = Task.Run(() =>
{
DelayConsumer(ref currentDelay, taskDelays, o, workerIndex);
});
}
Console.WriteLine("Waiting for consumer tasks");
Task.WaitAll(tasks);
Console.WriteLine("All consumer tasks completed");
}
private static void DelayConsumer(ref int currentDelay, List<TimeSpan> taskDelays, object o, int workerIndex)
{
Console.WriteLine($"worker #{workerIndex} starting");
while (true)
{
TimeSpan delay;
int delayIndex;
lock (o)
{
delayIndex = ++currentDelay;
if (delayIndex < taskDelays.Count)
{
delay = taskDelays[delayIndex];
}
else
{
Console.WriteLine($"worker #{workerIndex} exiting");
return;
}
}
Console.WriteLine($"worker #{workerIndex} sleeping for {delay.TotalMilliseconds} ms, task #{delayIndex}");
System.Threading.Thread.Sleep(delay);
}
}
}
}
在你的例子中,每个工人都会向某个全局状态报告进度。您没有为您的 "assignment" 工作人员显示 ReportProgress
处理程序,所以我不能具体说明这会是什么样子。但大概它会涉及将 -1
或 2
传递给一些知道如何处理这些值的方法(即,否则你的 ReportProgress
处理程序会是什么)。
请注意,如果您为任务使用实际的队列数据结构,代码可以稍微简化,尤其是在使用单个任务的地方。这种方法看起来像这样:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace TestSO42101517WaitAsyncTasks
{
class Program
{
static void Main(string[] args)
{
Random random = new Random();
int maxTasks = 30,
maxActive = 3,
maxDelayMs = 1000,
currentDelay = -1;
ConcurrentQueue<TimeSpan> taskDelays = new ConcurrentQueue<TimeSpan>();
for (int i = 0; i < maxTasks; i++)
{
taskDelays.Enqueue(TimeSpan.FromMilliseconds(random.Next(maxDelayMs)));
}
Task[] tasks = new Task[maxActive];
for (int i = 0; i < maxActive; i++)
{
int workerIndex = i;
tasks[i] = Task.Run(() =>
{
DelayConsumer(ref currentDelay, taskDelays, workerIndex);
});
}
Console.WriteLine("Waiting for consumer tasks");
Task.WaitAll(tasks);
Console.WriteLine("All consumer tasks completed");
}
private static void DelayConsumer(ref int currentDelayIndex, ConcurrentQueue<TimeSpan> taskDelays, int workerIndex)
{
Console.WriteLine($"worker #{workerIndex} starting");
while (true)
{
TimeSpan delay;
if (!taskDelays.TryDequeue(out delay))
{
Console.WriteLine($"worker #{workerIndex} exiting");
return;
}
int delayIndex = System.Threading.Interlocked.Increment(ref currentDelayIndex);
Console.WriteLine($"worker #{workerIndex} sleeping for {delay.TotalMilliseconds} ms, task #{delayIndex}");
System.Threading.Thread.Sleep(delay);
}
}
}
}
我正在尝试构建 Windows 异步查询的 Windows 表单工具。 该应用程序有一个数据网格视图,其中包含对 运行 的 30 个可能的查询。用户检查他想要执行的查询,比如说 10 个查询,然后点击一个按钮。 该应用程序有一个名为 maxthreads = 3 的变量(为了便于讨论),它指示可以使用多少线程来异步 运行 查询。生产环境中的查询 运行,我们不想同时使用太多线程 运行 使系统过载。每个查询 运行s 平均 30 秒。 (有的 5 分钟,有的 2 秒) 在 datagridview 中有一个包含图标的图像列,该图标描述了每个查询的状态(0-可用 运行,1-选择 运行ning,2- 运行,3 - 成功完成,-1 错误) 每次查询开始和结束时,我都需要能够与 UI 进行通信。查询完成后,结果将显示在 Tabcontrol 中包含的数据网格视图中(每个查询一个选项卡)
方法:我想创建一些 maxthread backgroundworkers 并让它们 运行 进行查询。当后台工作人员完成时,它会与 UI 通信并分配给新查询,依此类推,直到所有查询都已完成 运行。
我尝试使用 assignmentWorker 将工作分派给后台工作人员,但不知道如何等待所有线程完成。 bgw 完成后,它会向 assignmentWorker 报告 RunWorkerCompleted 事件的进度,但那个已经完成了。
在 UI 线程中,我用所有需要 运行:
的查询调用分配工作器private void btnRunQueries_Click(object sender, EventArgs e)
{
if (AnyQueriesSelected())
{
tcResult.TabPages.Clear();
foreach (DataGridViewRow dgr in dgvQueries.Rows)
{
if (Convert.ToBoolean(dgr.Cells["chk"].Value))
{
Query q = new Query(dgr.Cells["ID"].Value.ToString(),
dgr.Cells["Name"].Value.ToString(),
dgr.Cells["FileName"].Value.ToString(),
dgr.Cells["ShortDescription"].Value.ToString(),
dgr.Cells["LongDescription"].Value.ToString(),
dgr.Cells["Level"].Value.ToString(),
dgr.Cells["Task"].Value.ToString(),
dgr.Cells["Importance"].Value.ToString(),
dgr.Cells["SkillSet"].Value.ToString(),
false,
new Dictionary<string, string>()
{ { "#ClntNb#", txtClntNum.Text }, { "#Staff#", "100300" } });
qryList.Add(q);
}
}
assignmentWorker.RunWorkerAsync(qryList);
}
else
{
MessageBox.Show("Please select at least one query.",
"Warning",
MessageBoxButtons.OK,
MessageBoxIcon.Information);
}
}
这是 AssignmentWorker:
private void assignmentWorker_DoWork(object sender, DoWorkEventArgs e)
{
foreach (Query q in (List<Query>)e.Argument)
{
while (!q.Processed)
{
for (int threadNum = 0; threadNum < maxThreads; threadNum++)
{
if (!threadArray[threadNum].IsBusy)
{
threadArray[threadNum].RunWorkerAsync(q);
q.Processed = true;
assignmentWorker.ReportProgress(1, q);
break;
}
}
//If all threads are being used, sleep awhile before checking again
if (!q.Processed)
{
Thread.Sleep(500);
}
}
}
}
所有bgw 运行同一事件:
private void backgroundWorkerFiles_DoWork(object sender, DoWorkEventArgs e)
{
try
{
Query qry = (Query)e.Argument;
DataTable dtNew = DataAccess.RunQuery(qry).dtResult;
if (dsQryResults.Tables.Contains(dtNew.TableName))
{
dsQryResults.Tables.Remove(dtNew.TableName);
}
dsQryResults.Tables.Add(dtNew);
e.Result = qry;
}
catch (Exception ex)
{
}
}
查询返回并且数据表已添加到数据集后:
private void backgroundWorkerFiles_RunWorkerCompleted(object sender,
RunWorkerCompletedEventArgs e)
{
try
{
if (e.Error != null)
{
assignmentWorker.ReportProgress(-1, e.Result);
}
else
{
assignmentWorker.ReportProgress(2, e.Result);
}
}
catch (Exception ex)
{
int o = 0;
}
}
我遇到的问题是分配工作人员在 bgw 完成之前完成,并且对 assignmentWorker.ReportProgress 的调用见鬼去吧(原谅我的法语)。 我如何才能等待所有启动的 bgw 完成才能完成分配工作人员?
谢谢!
如
由于缺乏能够简洁明了地说明您的具体情况的Minimal, Complete, and Verifiable code example,提供直接解决您的问题的代码是不可行的。但是,这里有一个使用 List<T>
作为原始代码的示例,它将像我上面描述的那样工作:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace TestSO42101517WaitAsyncTasks
{
class Program
{
static void Main(string[] args)
{
Random random = new Random();
int maxTasks = 30,
maxActive = 3,
maxDelayMs = 1000,
currentDelay = -1;
List<TimeSpan> taskDelays = new List<TimeSpan>(maxTasks);
for (int i = 0; i < maxTasks; i++)
{
taskDelays.Add(TimeSpan.FromMilliseconds(random.Next(maxDelayMs)));
}
Task[] tasks = new Task[maxActive];
object o = new object();
for (int i = 0; i < maxActive; i++)
{
int workerIndex = i;
tasks[i] = Task.Run(() =>
{
DelayConsumer(ref currentDelay, taskDelays, o, workerIndex);
});
}
Console.WriteLine("Waiting for consumer tasks");
Task.WaitAll(tasks);
Console.WriteLine("All consumer tasks completed");
}
private static void DelayConsumer(ref int currentDelay, List<TimeSpan> taskDelays, object o, int workerIndex)
{
Console.WriteLine($"worker #{workerIndex} starting");
while (true)
{
TimeSpan delay;
int delayIndex;
lock (o)
{
delayIndex = ++currentDelay;
if (delayIndex < taskDelays.Count)
{
delay = taskDelays[delayIndex];
}
else
{
Console.WriteLine($"worker #{workerIndex} exiting");
return;
}
}
Console.WriteLine($"worker #{workerIndex} sleeping for {delay.TotalMilliseconds} ms, task #{delayIndex}");
System.Threading.Thread.Sleep(delay);
}
}
}
}
在你的例子中,每个工人都会向某个全局状态报告进度。您没有为您的 "assignment" 工作人员显示 ReportProgress
处理程序,所以我不能具体说明这会是什么样子。但大概它会涉及将 -1
或 2
传递给一些知道如何处理这些值的方法(即,否则你的 ReportProgress
处理程序会是什么)。
请注意,如果您为任务使用实际的队列数据结构,代码可以稍微简化,尤其是在使用单个任务的地方。这种方法看起来像这样:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace TestSO42101517WaitAsyncTasks
{
class Program
{
static void Main(string[] args)
{
Random random = new Random();
int maxTasks = 30,
maxActive = 3,
maxDelayMs = 1000,
currentDelay = -1;
ConcurrentQueue<TimeSpan> taskDelays = new ConcurrentQueue<TimeSpan>();
for (int i = 0; i < maxTasks; i++)
{
taskDelays.Enqueue(TimeSpan.FromMilliseconds(random.Next(maxDelayMs)));
}
Task[] tasks = new Task[maxActive];
for (int i = 0; i < maxActive; i++)
{
int workerIndex = i;
tasks[i] = Task.Run(() =>
{
DelayConsumer(ref currentDelay, taskDelays, workerIndex);
});
}
Console.WriteLine("Waiting for consumer tasks");
Task.WaitAll(tasks);
Console.WriteLine("All consumer tasks completed");
}
private static void DelayConsumer(ref int currentDelayIndex, ConcurrentQueue<TimeSpan> taskDelays, int workerIndex)
{
Console.WriteLine($"worker #{workerIndex} starting");
while (true)
{
TimeSpan delay;
if (!taskDelays.TryDequeue(out delay))
{
Console.WriteLine($"worker #{workerIndex} exiting");
return;
}
int delayIndex = System.Threading.Interlocked.Increment(ref currentDelayIndex);
Console.WriteLine($"worker #{workerIndex} sleeping for {delay.TotalMilliseconds} ms, task #{delayIndex}");
System.Threading.Thread.Sleep(delay);
}
}
}
}