在定时循环中处理多个线程,一出一入
Handle multiple threads, one out one in, in a timed loop
我需要在一夜之间处理大量文件,并定义开始和结束时间以避免打扰用户。我一直在调查,但现在处理线程的方法太多了,我不确定该走哪条路。这些文件作为附件进入 Exchange 收件箱。
基于此处的一些示例和一些实验,我目前的尝试是:
while (DateTime.Now < dtEndTime.Value)
{
var finished = new CountdownEvent(1);
for (int i = 0; i < numThreads; i++)
{
object state = offset;
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate
{
try
{
StartProcessing(state);
}
finally
{
finished.Signal();
}
});
offset += numberOfFilesPerPoll;
}
finished.Signal();
finished.Wait();
}
它 运行 目前在 winforms 应用程序中很容易,但核心处理在一个 dll 中,所以我可以从 windows 服务中生成我需要的 class ,但是从调度程序下的控制台 运行ning 是最简单的。我确实有一个 Windows 服务设置了一个 Timer 对象,该对象在配置文件中设置的时间开始处理。
所以我的问题是 - 在上面的代码中,我初始化了一堆线程(目前是 10 个),然后等待它们全部处理。我的理想是静态数量的线程,当一个线程完成时,我会触发另一个线程,然后当我到达结束时间时,我只是等待所有线程完成。
这样做的原因是我正在处理的文件大小可变——有些可能需要几秒钟才能处理,有些可能需要几个小时,所以我不希望整个应用程序等待一个线程完成,如果我能让它继续运行的话在后台。
(编辑)就目前而言,每个线程实例化一个 class 并传递给它一个偏移量。 class 然后从收件箱中获取下 x 封电子邮件,从偏移量开始(使用 Exchange Web 服务分页功能)。在处理每个文件时,它会移动到一个单独的文件夹中。从目前的一些回复来看,我想知道我是否真的应该在外循环中获取电子邮件,并根据需要生成线程。
为了解决这个问题,我目前积压了很多电子邮件,我正试图处理这些电子邮件。清除积压后,每晚 运行 的负载可能会显着降低。
平均每晚要处理大约 1000 个文件。
更新
我重写了大部分代码以便可以使用 Parallel.Foreach,但我遇到了线程安全问题。调用代码现在如下所示:
public bool StartProcessing()
{
FindItemsResults<Item> emails = GetEmails();
var source = new CancellationTokenSource(TimeSpan.FromHours(10));
// Process files in parallel, with a maximum thread count.
var opts = new ParallelOptions { MaxDegreeOfParallelism = 8, CancellationToken = source.Token };
try
{
Parallel.ForEach(emails, opts, processAttachment);
}
catch (OperationCanceledException)
{
Console.WriteLine("Loop was cancelled.");
}
catch (Exception err)
{
WriteToLogFile(err.Message + "\r\n");
WriteToLogFile(err.StackTrace + "r\n");
}
return true;
}
到目前为止一切顺利(请原谅临时错误处理)。我现在有一个新问题,即 "Item" 对象的属性(电子邮件)不是线程安全的。因此,例如,当我开始处理一封电子邮件时,我将其移至 "processing" 文件夹,这样另一个进程就无法获取它 - 但事实证明,几个线程可能正在尝试处理相同的电子邮件-一次邮寄。我如何保证这不会发生?我知道我需要添加一个锁,我可以在 ForEach 中添加它还是应该在 processAttachments 方法中?
使用 TPL:
Parallel.ForEach( EnumerateFiles(),
new ParallelOptions { MaxDegreeOfParallelism = 10 },
file => ProcessFile( file ) );
让EnumerateFiles
在你的结束时间结束时停止枚举,像这样:
IEnumerable<string> EnumerateFiles()
{
foreach (var file in Directory.EnumerateFiles( "*.txt" ))
if (DateTime.Now < _endTime)
yield return file;
else
yield break;
}
您可以结合使用 Parallel.ForEach()
和取消令牌源,它将在设定时间后取消操作:
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static Random rng = new Random();
static void Main()
{
// Simulate having a list of files.
var fileList = Enumerable.Range(1, 100000).Select(i => i.ToString());
// For demo purposes, cancel after a few seconds.
var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// Process files in parallel, with a maximum thread count.
var opts = new ParallelOptions {MaxDegreeOfParallelism = 8, CancellationToken = source .Token};
try
{
Parallel.ForEach(fileList, opts, processFile);
}
catch (OperationCanceledException)
{
Console.WriteLine("Loop was cancelled.");
}
}
static void processFile(string file)
{
Console.WriteLine("Processing file: " + file);
// Simulate taking a varying amount of time per file.
int delay;
lock (rng)
{
delay = rng.Next(200, 2000);
}
Thread.Sleep(delay);
Console.WriteLine("Processed file: " + file);
}
}
}
作为使用取消令牌的替代方法,您可以编写一个方法 returns IEnumerable<string>
其中 returns 文件名列表,并在时间到了时停止返回它们,例如:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static Random rng = new Random();
static void Main()
{
// Process files in parallel, with a maximum thread count.
var opts = new ParallelOptions {MaxDegreeOfParallelism = 8};
Parallel.ForEach(fileList(), opts, processFile);
}
static IEnumerable<string> fileList()
{
// Simulate having a list of files.
var fileList = Enumerable.Range(1, 100000).Select(x => x.ToString()).ToArray();
// Simulate finishing after a few seconds.
DateTime endTime = DateTime.Now + TimeSpan.FromSeconds(10);
int i = 0;
while (DateTime.Now <= endTime)
yield return fileList[i++];
}
static void processFile(string file)
{
Console.WriteLine("Processing file: " + file);
// Simulate taking a varying amount of time per file.
int delay;
lock (rng)
{
delay = rng.Next(200, 2000);
}
Thread.Sleep(delay);
Console.WriteLine("Processed file: " + file);
}
}
}
请注意,使用此方法不需要 try/catch。
您应该考虑使用 Microsoft 的 Reactive Framework。它使您可以使用 LINQ 查询以非常简单的方式处理多线程异步处理。
像这样:
var query =
from file in filesToProcess.ToObservable()
where DateTime.Now < stopTime
from result in Observable.Start(() => StartProcessing(file))
select new { file, result };
var subscription =
query.Subscribe(x =>
{
/* handle result */
});
真的,如果 StartProcessing
已经定义,这就是您需要的所有代码。
仅 NuGet "Rx-Main".
哦,要随时停止处理,只需调用 subscription.Dispose()
。
这是一项真正令人着迷的任务,我花了一段时间才将代码提升到我满意的水平。
我最终得到了以上的组合。
首先值得注意的是,我将以下行添加到我的网络服务调用中,因为我遇到了操作超时,我认为这是因为我超出了端点上设置的一些限制,实际上是由于微软在 .Net 2.0 中设置的限制:
ServicePointManager.DefaultConnectionLimit = int.MaxValue;
ServicePointManager.Expect100Continue = false;
查看此处了解更多信息:
What to set ServicePointManager.DefaultConnectionLimit to
添加这些代码行后,我的处理速度从 10 次/分钟增加到大约 100 次/分钟。
但我仍然对循环和分区等不满意。我的服务转移到物理服务器上以尽量减少 CPU 争用,我想让操作系统决定它的速度运行,而不是我的代码限制它。
经过一些研究,这就是我的最终结果 - 可以说不是我写过的最优雅的代码,但它非常快速和可靠。
List<XElement> elements = new List<XElement>();
while (XMLDoc.ReadToFollowing("ElementName"))
{
using (XmlReader r = XMLDoc.ReadSubtree())
{
r.Read();
XElement node = XElement.Load(r);
//do some processing of the node here...
elements.Add(node);
}
}
//And now pass the list of elements through PLinQ to the actual web service call, allowing the OS/framework to handle the parallelism
int failCount=0; //the method call below sets this per request; we log and continue
failCount = elements.AsParallel()
.Sum(element => IntegrationClass.DoRequest(element.ToString()));
结果非常简单,而且速度快如闪电。
我希望这对尝试做同样事情的其他人有所帮助!
我需要在一夜之间处理大量文件,并定义开始和结束时间以避免打扰用户。我一直在调查,但现在处理线程的方法太多了,我不确定该走哪条路。这些文件作为附件进入 Exchange 收件箱。
基于此处的一些示例和一些实验,我目前的尝试是:
while (DateTime.Now < dtEndTime.Value)
{
var finished = new CountdownEvent(1);
for (int i = 0; i < numThreads; i++)
{
object state = offset;
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate
{
try
{
StartProcessing(state);
}
finally
{
finished.Signal();
}
});
offset += numberOfFilesPerPoll;
}
finished.Signal();
finished.Wait();
}
它 运行 目前在 winforms 应用程序中很容易,但核心处理在一个 dll 中,所以我可以从 windows 服务中生成我需要的 class ,但是从调度程序下的控制台 运行ning 是最简单的。我确实有一个 Windows 服务设置了一个 Timer 对象,该对象在配置文件中设置的时间开始处理。
所以我的问题是 - 在上面的代码中,我初始化了一堆线程(目前是 10 个),然后等待它们全部处理。我的理想是静态数量的线程,当一个线程完成时,我会触发另一个线程,然后当我到达结束时间时,我只是等待所有线程完成。 这样做的原因是我正在处理的文件大小可变——有些可能需要几秒钟才能处理,有些可能需要几个小时,所以我不希望整个应用程序等待一个线程完成,如果我能让它继续运行的话在后台。 (编辑)就目前而言,每个线程实例化一个 class 并传递给它一个偏移量。 class 然后从收件箱中获取下 x 封电子邮件,从偏移量开始(使用 Exchange Web 服务分页功能)。在处理每个文件时,它会移动到一个单独的文件夹中。从目前的一些回复来看,我想知道我是否真的应该在外循环中获取电子邮件,并根据需要生成线程。 为了解决这个问题,我目前积压了很多电子邮件,我正试图处理这些电子邮件。清除积压后,每晚 运行 的负载可能会显着降低。
平均每晚要处理大约 1000 个文件。
更新
我重写了大部分代码以便可以使用 Parallel.Foreach,但我遇到了线程安全问题。调用代码现在如下所示:
public bool StartProcessing()
{
FindItemsResults<Item> emails = GetEmails();
var source = new CancellationTokenSource(TimeSpan.FromHours(10));
// Process files in parallel, with a maximum thread count.
var opts = new ParallelOptions { MaxDegreeOfParallelism = 8, CancellationToken = source.Token };
try
{
Parallel.ForEach(emails, opts, processAttachment);
}
catch (OperationCanceledException)
{
Console.WriteLine("Loop was cancelled.");
}
catch (Exception err)
{
WriteToLogFile(err.Message + "\r\n");
WriteToLogFile(err.StackTrace + "r\n");
}
return true;
}
到目前为止一切顺利(请原谅临时错误处理)。我现在有一个新问题,即 "Item" 对象的属性(电子邮件)不是线程安全的。因此,例如,当我开始处理一封电子邮件时,我将其移至 "processing" 文件夹,这样另一个进程就无法获取它 - 但事实证明,几个线程可能正在尝试处理相同的电子邮件-一次邮寄。我如何保证这不会发生?我知道我需要添加一个锁,我可以在 ForEach 中添加它还是应该在 processAttachments 方法中?
使用 TPL:
Parallel.ForEach( EnumerateFiles(),
new ParallelOptions { MaxDegreeOfParallelism = 10 },
file => ProcessFile( file ) );
让EnumerateFiles
在你的结束时间结束时停止枚举,像这样:
IEnumerable<string> EnumerateFiles()
{
foreach (var file in Directory.EnumerateFiles( "*.txt" ))
if (DateTime.Now < _endTime)
yield return file;
else
yield break;
}
您可以结合使用 Parallel.ForEach()
和取消令牌源,它将在设定时间后取消操作:
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static Random rng = new Random();
static void Main()
{
// Simulate having a list of files.
var fileList = Enumerable.Range(1, 100000).Select(i => i.ToString());
// For demo purposes, cancel after a few seconds.
var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// Process files in parallel, with a maximum thread count.
var opts = new ParallelOptions {MaxDegreeOfParallelism = 8, CancellationToken = source .Token};
try
{
Parallel.ForEach(fileList, opts, processFile);
}
catch (OperationCanceledException)
{
Console.WriteLine("Loop was cancelled.");
}
}
static void processFile(string file)
{
Console.WriteLine("Processing file: " + file);
// Simulate taking a varying amount of time per file.
int delay;
lock (rng)
{
delay = rng.Next(200, 2000);
}
Thread.Sleep(delay);
Console.WriteLine("Processed file: " + file);
}
}
}
作为使用取消令牌的替代方法,您可以编写一个方法 returns IEnumerable<string>
其中 returns 文件名列表,并在时间到了时停止返回它们,例如:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static Random rng = new Random();
static void Main()
{
// Process files in parallel, with a maximum thread count.
var opts = new ParallelOptions {MaxDegreeOfParallelism = 8};
Parallel.ForEach(fileList(), opts, processFile);
}
static IEnumerable<string> fileList()
{
// Simulate having a list of files.
var fileList = Enumerable.Range(1, 100000).Select(x => x.ToString()).ToArray();
// Simulate finishing after a few seconds.
DateTime endTime = DateTime.Now + TimeSpan.FromSeconds(10);
int i = 0;
while (DateTime.Now <= endTime)
yield return fileList[i++];
}
static void processFile(string file)
{
Console.WriteLine("Processing file: " + file);
// Simulate taking a varying amount of time per file.
int delay;
lock (rng)
{
delay = rng.Next(200, 2000);
}
Thread.Sleep(delay);
Console.WriteLine("Processed file: " + file);
}
}
}
请注意,使用此方法不需要 try/catch。
您应该考虑使用 Microsoft 的 Reactive Framework。它使您可以使用 LINQ 查询以非常简单的方式处理多线程异步处理。
像这样:
var query =
from file in filesToProcess.ToObservable()
where DateTime.Now < stopTime
from result in Observable.Start(() => StartProcessing(file))
select new { file, result };
var subscription =
query.Subscribe(x =>
{
/* handle result */
});
真的,如果 StartProcessing
已经定义,这就是您需要的所有代码。
仅 NuGet "Rx-Main".
哦,要随时停止处理,只需调用 subscription.Dispose()
。
这是一项真正令人着迷的任务,我花了一段时间才将代码提升到我满意的水平。
我最终得到了以上的组合。
首先值得注意的是,我将以下行添加到我的网络服务调用中,因为我遇到了操作超时,我认为这是因为我超出了端点上设置的一些限制,实际上是由于微软在 .Net 2.0 中设置的限制:
ServicePointManager.DefaultConnectionLimit = int.MaxValue;
ServicePointManager.Expect100Continue = false;
查看此处了解更多信息:
What to set ServicePointManager.DefaultConnectionLimit to
添加这些代码行后,我的处理速度从 10 次/分钟增加到大约 100 次/分钟。
但我仍然对循环和分区等不满意。我的服务转移到物理服务器上以尽量减少 CPU 争用,我想让操作系统决定它的速度运行,而不是我的代码限制它。
经过一些研究,这就是我的最终结果 - 可以说不是我写过的最优雅的代码,但它非常快速和可靠。
List<XElement> elements = new List<XElement>();
while (XMLDoc.ReadToFollowing("ElementName"))
{
using (XmlReader r = XMLDoc.ReadSubtree())
{
r.Read();
XElement node = XElement.Load(r);
//do some processing of the node here...
elements.Add(node);
}
}
//And now pass the list of elements through PLinQ to the actual web service call, allowing the OS/framework to handle the parallelism
int failCount=0; //the method call below sets this per request; we log and continue
failCount = elements.AsParallel()
.Sum(element => IntegrationClass.DoRequest(element.ToString()));
结果非常简单,而且速度快如闪电。
我希望这对尝试做同样事情的其他人有所帮助!