在 c# 中创建多线程应用程序以 运行 多个查询

Create a multi threaded applications to run multiple queries in c#

我正在尝试构建 Windows 异步查询的 Windows 表单工具。 该应用程序有一个数据网格视图,其中包含对 运行 的 30 个可能的查询。用户检查他想要执行的查询,比如说 10 个查询,然后点击一个按钮。 该应用程序有一个名为 maxthreads = 3 的变量(为了便于讨论),它指示可以使用多少线程来异步 运行 查询。生产环境中的查询 运行,我们不想同时使用太多线程 运行 使系统过载。每个查询 运行s 平均 30 秒。 (有的 5 分钟,有的 2 秒) 在 datagridview 中有一个包含图标的图像列,该图标描述了每个查询的状态(0-可用 运行,1-选择 运行ning,2- 运行,3 - 成功完成,-1 错误) 每次查询开始和结束时,我都需要能够与 UI 进行通信。查询完成后,结果将显示在 Tabcontrol 中包含的数据网格视图中(每个查询一个选项卡)

方法:我想创建一些 maxthread backgroundworkers 并让它们 运行 进行查询。当后台工作人员完成时,它会与 UI 通信并分配给新查询,依此类推,直到所有查询都已完成 运行。

我尝试使用 assignmentWorker 将工作分派给后台工作人员,但不知道如何等待所有线程完成。 bgw 完成后,它会向 assignmentWorker 报告 RunWorkerCompleted 事件的进度,但那个已经完成了。

在 UI 线程中,我用所有需要 运行:

的查询调用分配工作器
private void btnRunQueries_Click(object sender, EventArgs e)
    {
        if (AnyQueriesSelected())
        {
            tcResult.TabPages.Clear();

            foreach (DataGridViewRow dgr in dgvQueries.Rows)
            {
                if (Convert.ToBoolean(dgr.Cells["chk"].Value))
                {
                    Query q = new Query(dgr.Cells["ID"].Value.ToString(),
                        dgr.Cells["Name"].Value.ToString(),
                        dgr.Cells["FileName"].Value.ToString(),
                        dgr.Cells["ShortDescription"].Value.ToString(),
                        dgr.Cells["LongDescription"].Value.ToString(),
                        dgr.Cells["Level"].Value.ToString(),
                        dgr.Cells["Task"].Value.ToString(),
                        dgr.Cells["Importance"].Value.ToString(),
                        dgr.Cells["SkillSet"].Value.ToString(),
                        false,
                        new Dictionary<string, string>() 
                        { { "#ClntNb#", txtClntNum.Text }, { "#Staff#", "100300" } });

                    qryList.Add(q);
                }
            }
            assignmentWorker.RunWorkerAsync(qryList);
        }
        else
        {
            MessageBox.Show("Please select at least one query.",
                            "Warning",
                            MessageBoxButtons.OK,
                            MessageBoxIcon.Information);
        }
    }

这是 AssignmentWorker:

private void assignmentWorker_DoWork(object sender, DoWorkEventArgs e)
    {
        foreach (Query q in (List<Query>)e.Argument)
        {
            while (!q.Processed)
            {
                for (int threadNum = 0; threadNum < maxThreads; threadNum++)
                {
                    if (!threadArray[threadNum].IsBusy)
                    {
                        threadArray[threadNum].RunWorkerAsync(q);
                        q.Processed = true;
                        assignmentWorker.ReportProgress(1, q);
                        break;
                    }
                }

                //If all threads are being used, sleep awhile before checking again
                if (!q.Processed)
                {
                    Thread.Sleep(500);
                }
            }
        }
    }

所有bgw 运行同一事件:

private void backgroundWorkerFiles_DoWork(object sender, DoWorkEventArgs e)
    {
        try
        {
            Query qry = (Query)e.Argument;

            DataTable dtNew = DataAccess.RunQuery(qry).dtResult;

            if (dsQryResults.Tables.Contains(dtNew.TableName))
            {
                dsQryResults.Tables.Remove(dtNew.TableName);
            }

            dsQryResults.Tables.Add(dtNew);

            e.Result = qry;
        }
        catch (Exception ex)
        {

        }
    }

查询返回并且数据表已添加到数据集后:

private void backgroundWorkerFiles_RunWorkerCompleted(object sender, 
                                                    RunWorkerCompletedEventArgs e)
    {
        try
        {
            if (e.Error != null)
            {
                assignmentWorker.ReportProgress(-1, e.Result);
            }
            else
            {
                assignmentWorker.ReportProgress(2, e.Result);
            }
        }
        catch (Exception ex)
        {
            int o = 0;
        }
    }

我遇到的问题是分配工作人员在 bgw 完成之前完成,并且对 assignmentWorker.ReportProgress 的调用见鬼去吧(原谅我的法语)。 我如何才能等待所有启动的 bgw 完成才能完成分配工作人员?

谢谢!

中所述,您的设计过于复杂。如果您有特定的最大数量的应该同时执行的任务(查询),您可以并且应该简单地创建该数量的工作人员,并让他们从您的任务队列(或列表)中消耗任务,直到该队列为空。

由于缺乏能够简洁明了地说明您的具体情况的Minimal, Complete, and Verifiable code example,提供直接解决您的问题的代码是不可行的。但是,这里有一个使用 List<T> 作为原始代码的示例,它将像我上面描述的那样工作:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace TestSO42101517WaitAsyncTasks
{
    class Program
    {
        static void Main(string[] args)
        {
            Random random = new Random();
            int maxTasks = 30,
                maxActive = 3,
                maxDelayMs = 1000,
                currentDelay = -1;
            List<TimeSpan> taskDelays = new List<TimeSpan>(maxTasks);

            for (int i = 0; i < maxTasks; i++)
            {
                taskDelays.Add(TimeSpan.FromMilliseconds(random.Next(maxDelayMs)));
            }

            Task[] tasks = new Task[maxActive];
            object o = new object();

            for (int i = 0; i < maxActive; i++)
            {
                int workerIndex = i;

                tasks[i] = Task.Run(() =>
                {
                    DelayConsumer(ref currentDelay, taskDelays, o, workerIndex);
                });
            }

            Console.WriteLine("Waiting for consumer tasks");

            Task.WaitAll(tasks);

            Console.WriteLine("All consumer tasks completed");
        }

        private static void DelayConsumer(ref int currentDelay, List<TimeSpan> taskDelays, object o, int workerIndex)
        {
            Console.WriteLine($"worker #{workerIndex} starting");

            while (true)
            {
                TimeSpan delay;    
                int delayIndex;

                lock (o)
                {
                    delayIndex = ++currentDelay;
                    if (delayIndex < taskDelays.Count)
                    {
                        delay = taskDelays[delayIndex];
                    }
                    else
                    {
                        Console.WriteLine($"worker #{workerIndex} exiting");
                        return;
                    }
                }

                Console.WriteLine($"worker #{workerIndex} sleeping for {delay.TotalMilliseconds} ms, task #{delayIndex}");
                System.Threading.Thread.Sleep(delay);
            }
        }
    }
}

在你的例子中,每个工人都会向某个全局状态报告进度。您没有为您的 "assignment" 工作人员显示 ReportProgress 处理程序,所以我不能具体说明这会是什么样子。但大概它会涉及将 -12 传递给一些知道如何处理这些值的方法(即,否则你的 ReportProgress 处理程序会是什么)。

请注意,如果您为任务使用实际的队列数据结构,代码可以稍微简化,尤其是在使用单个任务的地方。这种方法看起来像这样:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace TestSO42101517WaitAsyncTasks
{
    class Program
    {
        static void Main(string[] args)
        {
            Random random = new Random();
            int maxTasks = 30,
                maxActive = 3,
                maxDelayMs = 1000,
                currentDelay = -1;
            ConcurrentQueue<TimeSpan> taskDelays = new ConcurrentQueue<TimeSpan>();

            for (int i = 0; i < maxTasks; i++)
            {
                taskDelays.Enqueue(TimeSpan.FromMilliseconds(random.Next(maxDelayMs)));
            }

            Task[] tasks = new Task[maxActive];

            for (int i = 0; i < maxActive; i++)
            {
                int workerIndex = i;

                tasks[i] = Task.Run(() =>
                {
                    DelayConsumer(ref currentDelay, taskDelays, workerIndex);
                });
            }

            Console.WriteLine("Waiting for consumer tasks");

            Task.WaitAll(tasks);

            Console.WriteLine("All consumer tasks completed");
        }

        private static void DelayConsumer(ref int currentDelayIndex, ConcurrentQueue<TimeSpan> taskDelays, int workerIndex)
        {
            Console.WriteLine($"worker #{workerIndex} starting");

            while (true)
            {
                TimeSpan delay;

                if (!taskDelays.TryDequeue(out delay))
                {
                    Console.WriteLine($"worker #{workerIndex} exiting");
                    return;
                }

                int delayIndex = System.Threading.Interlocked.Increment(ref currentDelayIndex);

                Console.WriteLine($"worker #{workerIndex} sleeping for {delay.TotalMilliseconds} ms, task #{delayIndex}");
                System.Threading.Thread.Sleep(delay);
            }
        }
    }
}