在定时循环中处理多个线程,一出一入

Handle multiple threads, one out one in, in a timed loop

我需要在一夜之间处理大量文件,并定义开始和结束时间以避免打扰用户。我一直在调查,但现在处理线程的方法太多了,我不确定该走哪条路。这些文件作为附件进入 Exchange 收件箱。

基于此处的一些示例和一些实验,我目前的尝试是:

 while (DateTime.Now < dtEndTime.Value)
 {
            var finished = new CountdownEvent(1);
            for (int i = 0; i < numThreads; i++)
            {


                object state = offset;

                finished.AddCount();
                ThreadPool.QueueUserWorkItem(delegate
                {
                    try
                    {
                        StartProcessing(state);
                    }
                    finally
                    {
                        finished.Signal();
                    }
                });

                offset += numberOfFilesPerPoll;

            }
            finished.Signal();
            finished.Wait(); 


        }

它 运行 目前在 winforms 应用程序中很容易,但核心处理在一个 dll 中,所以我可以从 windows 服务中生成我需要的 class ,但是从调度程序下的控制台 运行ning 是最简单的。我确实有一个 Windows 服务设置了一个 Timer 对象,该对象在配置文件中设置的时间开始处理。

所以我的问题是 - 在上面的代码中,我初始化了一堆线程(目前是 10 个),然后等待它们全部处理。我的理想是静态数量的线程,当一个线程完成时,我会触发另一个线程,然后当我到达结束时间时,我只是等待所有线程完成。 这样做的原因是我正在处理的文件大小可变——有些可能需要几秒钟才能处理,有些可能需要几个小时,所以我不希望整个应用程序等待一个线程完成,如果我能让它继续运行的话在后台。 (编辑)就目前而言,每个线程实例化一个 class 并传递给它一个偏移量。 class 然后从收件箱中获取下 x 封电子邮件,从偏移量开始(使用 Exchange Web 服务分页功能)。在处理每个文件时,它会移动到一个单独的文件夹中。从目前的一些回复来看,我想知道我是否真的应该在外循环中获取电子邮件,并根据需要生成线程。 为了解决这个问题,我目前积压了很多电子邮件,我正试图处理这些电子邮件。清除积压后,每晚 运行 的负载可能会显着降低。

平均每晚要处理大约 1000 个文件。

更新

我重写了大部分代码以便可以使用 Parallel.Foreach,但我遇到了线程安全问题。调用代码现在如下所示:

public bool StartProcessing()
        {

            FindItemsResults<Item> emails = GetEmails();



            var source = new CancellationTokenSource(TimeSpan.FromHours(10));

            // Process files in parallel, with a maximum thread count.
            var opts = new ParallelOptions { MaxDegreeOfParallelism = 8, CancellationToken = source.Token };

            try
            {
                Parallel.ForEach(emails, opts, processAttachment);
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Loop was cancelled.");
            }
            catch (Exception err)
            {
                WriteToLogFile(err.Message + "\r\n");
                WriteToLogFile(err.StackTrace + "r\n");
            }
            return true;
        }

到目前为止一切顺利(请原谅临时错误处理)。我现在有一个新问题,即 "Item" 对象的属性(电子邮件)不是线程安全的。因此,例如,当我开始处理一封电子邮件时,我将其移至 "processing" 文件夹,这样另一个进程就无法获取它 - 但事实证明,几个线程可能正在尝试处理相同的电子邮件-一次邮寄。我如何保证这不会发生?我知道我需要添加一个锁,我可以在 ForEach 中添加它还是应该在 processAttachments 方法中?

使用 TPL:

Parallel.ForEach( EnumerateFiles(),
                  new ParallelOptions { MaxDegreeOfParallelism = 10 },
                  file => ProcessFile( file ) );

EnumerateFiles在你的结束时间结束时停止枚举,像这样:

IEnumerable<string> EnumerateFiles()
{
    foreach (var file in Directory.EnumerateFiles( "*.txt" ))
        if (DateTime.Now < _endTime)
            yield return file;
        else
            yield break;
}

您可以结合使用 Parallel.ForEach() 和取消令牌源,它将在设定时间后取消操作:

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static Random rng = new Random();

        static void Main()
        {
            // Simulate having a list of files.
            var fileList = Enumerable.Range(1, 100000).Select(i => i.ToString());

            // For demo purposes, cancel after a few seconds.
            var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));

            // Process files in parallel, with a maximum thread count.
            var opts = new ParallelOptions {MaxDegreeOfParallelism = 8, CancellationToken = source .Token};

            try
            {
                Parallel.ForEach(fileList, opts, processFile);
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Loop was cancelled.");
            }
        }

        static void processFile(string file)
        {
            Console.WriteLine("Processing file: " + file);

            // Simulate taking a varying amount of time per file.

            int delay;

            lock (rng)
            {
                delay = rng.Next(200, 2000);
            }

            Thread.Sleep(delay);

            Console.WriteLine("Processed file: " + file);
        }
    }
}

作为使用取消令牌的替代方法,您可以编写一个方法 returns IEnumerable<string> 其中 returns 文件名列表,并在时间到了时停止返回它们,例如:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static Random rng = new Random();

        static void Main()
        {
            // Process files in parallel, with a maximum thread count.
            var opts = new ParallelOptions {MaxDegreeOfParallelism = 8};
            Parallel.ForEach(fileList(), opts, processFile);
        }

        static IEnumerable<string> fileList()
        {
            // Simulate having a list of files.
            var fileList = Enumerable.Range(1, 100000).Select(x => x.ToString()).ToArray();

            // Simulate finishing after a few seconds.
            DateTime endTime = DateTime.Now + TimeSpan.FromSeconds(10);

            int i = 0;

            while (DateTime.Now <= endTime)
                yield return fileList[i++];
        }

        static void processFile(string file)
        {
            Console.WriteLine("Processing file: " + file);

            // Simulate taking a varying amount of time per file.

            int delay;

            lock (rng)
            {
                delay = rng.Next(200, 2000);
            }

            Thread.Sleep(delay);

            Console.WriteLine("Processed file: " + file);
        }
    }
}

请注意,使用此方法不需要 try/catch。

您应该考虑使用 Microsoft 的 Reactive Framework。它使您可以使用 LINQ 查询以非常简单的方式处理多线程异步处理。

像这样:

var query =
    from file in filesToProcess.ToObservable()
    where DateTime.Now < stopTime
    from result in Observable.Start(() => StartProcessing(file))
    select new { file, result };

var subscription =
    query.Subscribe(x =>
    {
        /* handle result */
    });

真的,如果 StartProcessing 已经定义,这就是您需要的所有代码。

仅 NuGet "Rx-Main".

哦,要随时停止处理,只需调用 subscription.Dispose()

这是一项真正令人着迷的任务,我花了一段时间才将代码提升到我满意的水平。

我最终得到了以上的组合。

首先值得注意的是,我将以下行添加到我的网络服务调用中,因为我遇到了操作超时,我认为这是因为我超出了端点上设置的一些限制,实际上是由于微软在 .Net 2.0 中设置的限制:

ServicePointManager.DefaultConnectionLimit = int.MaxValue;
ServicePointManager.Expect100Continue = false;

查看此处了解更多信息:

What to set ServicePointManager.DefaultConnectionLimit to

添加这些代码行后,我的处理速度从 10 次/分钟增加到大约 100 次/分钟。

但我仍然对循环和分区等不满意。我的服务转移到物理服务器上以尽量减少 CPU 争用,我想让操作系统决定它的速度运行,而不是我的代码限制它。

经过一些研究,这就是我的最终结果 - 可以说不是我写过的最优雅的代码,但它非常快速和可靠。

List<XElement> elements = new List<XElement>();
 while (XMLDoc.ReadToFollowing("ElementName"))
     {
   using (XmlReader r = XMLDoc.ReadSubtree())
      {
   r.Read();
   XElement node = XElement.Load(r);
//do some processing of the node here...
elements.Add(node);
}
}
//And now pass the list of elements through PLinQ to the actual web service call, allowing the OS/framework to handle the parallelism

int failCount=0; //the method call below sets this per request; we log and continue

failCount = elements.AsParallel()
                            .Sum(element => IntegrationClass.DoRequest(element.ToString()));

结果非常简单,而且速度快如闪电。

我希望这对尝试做同样事情的其他人有所帮助!