FileWatcher 问题
FileWatcher issue
好的,所以这次我创建了一个带有文件观察器的服务来处理创建后的文件。
当正在处理的文件达到 1000 时(我正在接收大量消息),我的服务似乎崩溃了。
这是我的逻辑:文件进来,文件观察者读取文本发送到电子邮件,插入数据库,将原始消息移动到文件夹。
在服务启动时,我在开始观看之前先处理待处理的消息(我说的是超过 1000 个待处理的文本文件)并且我的服务需要大约一秒钟来处理每个文件。
一切正常,但是当传入文件总数达到 1000 时,它就崩溃了。
有时服务会停止处理挂起,只开始寻找新文件。
我有 "InternalBufferSize = 64000" 建议的最大值。
请帮助我的代码(我知道它应该是多线程的以便更好地处理,但我不是那个专家):
protected override void OnStart(string[] args)
{
using(TREEEntities TEX = new TREEEntities())
{
var mp= TEX.TREE_settings.FirstOrDefault(x=>x.SET_key =="MSGDump");
MsgsPath = mp.SET_value;
var dc = TEX.TREE_settings.FirstOrDefault(x => x.SET_key == "DupCash");
DupCash = Convert.ToInt16(dc.SET_value);
}
if (Directory.Exists(MsgsPath))
{
if (!Directory.Exists(MsgsPath+"\Archive"))
{
Directory.CreateDirectory(MsgsPath+"\Archive");
}
if (!Directory.Exists(MsgsPath + "\Duplicates"))
{
Directory.CreateDirectory(MsgsPath + "\Duplicates");
}
if (!Directory.Exists(MsgsPath + "\Unsent"))
{
Directory.CreateDirectory(MsgsPath + "\Unsent");
}
}
else
{
Directory.CreateDirectory(MsgsPath);
Directory.CreateDirectory(MsgsPath + "\Archive");
Directory.CreateDirectory(MsgsPath + "\Duplicates");
Directory.CreateDirectory(MsgsPath + "\Unsent");
}
processPending();//<--- process pending files after last service stop
fileSystemWatcher1.Path = MsgsPath;//<--- path to be watched
fileSystemWatcher1.EnableRaisingEvents = true;
fileSystemWatcher1.InternalBufferSize = 64000;
addToLog(DateTime.Now, "Service Started", 0, "Service", "Info");
addToLog(DateTime.Now, "File Watcher Started", 0, "Service", "Info");
//dupList.Clear();//<--- clear duplicates validation list
}
protected override void OnStop()
{
fileSystemWatcher1.EnableRaisingEvents = false;
addToLog(DateTime.Now, "File Watcher Stopped", 0, "Service", "Alert");
addToLog(DateTime.Now, "Service Stopped", 0, "Service", "Alert");
}
private void fileSystemWatcher1_Created(object sender, FileSystemEventArgs e)
{
try
{
//---------read from file------------
Thread.Sleep(200);//<---give the file some time to get released
string block;
using (StreamReader sr = File.OpenText(MsgsPath + "\" + e.Name))
{
block = sr.ReadToEnd();
}
PRT = block.Substring(block.Length - 6, 6);//<--- get the printer name
seq = Convert.ToInt16(block.Substring(block.Length - 20, 20).Substring(0, 4));//<--- get the sequence number
switch (PRT)//<----track sequence number from the 3 printers
{
case "64261B"://<---prt1
int seqPlus1=0;
if(seqPrt1 == 9999)//<---ignore sequence change from 9999 to 1
{ seqPlus1 = 1; }
else { seqPlus1 = seqPrt1 + 1; }
if (seq != seqPlus1 && seqPrt1 != 0)//<---"0" to avoid first service start
{
int x = seq - seqPrt1 - 1;
for (int i = 1; i <= x; i++)
{
addToMissing(PRT, seqPlus1);
addToLog(DateTime.Now, "Missing Sequence Number On Printer: " + PRT + " - " + seqPlus1, seqPlus1, "Service", "Missing");
seqPlus1++;
}
seqPrt1 = seq;
}
else { seqPrt1 = seq; }
break;
case "24E9AA"://<---prt2
int seqPlus2=0;
if(seqPrt2 == 9999)
{ seqPlus2 = 1; }
if (seq != seqPlus2 && seqPrt2 != 0)
{
int x = seq - seqPrt2 - 1;
for (int i = 1; i <= x; i++)
{
addToMissing(PRT, seqPlus2);
addToLog(DateTime.Now, "Missing Sequence Number On Printer: " + PRT + " - " + seqPlus2, seqPlus2, "Service", "Missing");
seqPlus2++;
}
seqPrt2 = seq;
}
else { seqPrt2 = seq; }
break;
case "642602"://<---prt3
int seqPlus3=0;
if(seqPrt3 == 9999)
{ seqPlus3 = 1; }
if (seq != seqPlus3 && seqPrt3 != 0)
{
int x = seq - seqPrt3 - 1;
for (int i = 1; i <= x; i++)
{
addToMissing(PRT, seqPlus3);
addToLog(DateTime.Now, "Missing Sequence Number On Printer: " + PRT + " - " + seqPlus3, seqPlus3, "Service", "Missing");
seqPlus3++;
}
seqPrt3 = seq;
}
else { seqPrt3 = seq; }
break;
}
block = block.Remove(block.Length - 52);//<--- trim the sequence number and unwanted info
string[] Alladd;
List<string> sent = new List<string>();
if (!dupList.Contains(block)) //<--- if msg not found in duplicates validation list
{
//--------extract values--------------
if (block.Substring(0, 3) == "\r\nQ") //<--- if the msg. contains a priority code
{
Alladd = block.Substring(0, block.IndexOf(".")).Replace("\r\n", " ").Substring(4).Split(' ').Distinct().Where(x => !string.IsNullOrEmpty(x)).ToArray(); ;
}
else//<--- if no priority code
{
Alladd = block.Substring(0, block.IndexOf(".")).Replace("\r\n", " ").Substring(1).Split(' ').Distinct().Where(x => !string.IsNullOrEmpty(x)).ToArray(); ;
}
string From = block.Substring(block.IndexOf('.') + 1).Substring(0, 7);
string Msg = block.Substring(block.IndexOf('.') + 1);
Msg = Msg.Substring(Msg.IndexOf('\n') + 1);
//--------add msg content to the DB group table--------
using (TREEEntities TE1 = new TREEEntities())
{
TREE_group tg = new TREE_group()
{
GROUP_original = block,
GROUP_sent = Msg,
GROUP_dateTime = DateTime.Now,
GROUP_from = From,
GROUP_seq = seq,
GROUP_prt = PRT,
};
TE1.AddToTREE_group(tg);
TE1.SaveChanges();
GID = tg.GROUP_ID;
}
//--------validate addresses---------------
foreach (string TB in Alladd)
{
string email = "";
string typeB = "";
TREEEntities TE = new TREEEntities();
var q1 = from x in TE.TREE_users where x.USR_TypeB == TB && x.USR_flag == "act" select new { x.USR_email, x.USR_TypeB };
foreach (var itm in q1)
{
email = itm.USR_email;
typeB = itm.USR_TypeB;
}
//-------send mail if the user exist----
if (TB == typeB)
{
if (typeB == "BAHMVGF")
{
addToFtl(block);
}
try
{
sendMail SM = new sendMail();
SM.SendMail(Msg, "Message from: " + From, email);
//---save record in DB----
addToMsg(typeB, email,"sent","act",1,GID,seq);
sent.Add(typeB);
}
catch (Exception x)
{
addToMsg(typeB, email, "Failed", "act", 1, GID, seq);
addToLog(DateTime.Now, "Send message failed: " + x.Message, GID, "Service", "Warning");
}
}
//-------if no user exist----
else
{
if (TB == "BAHMVGF")
{
addToFtl(block);
}
addToMsg(TB, "No email", "Failed", "act", 1, GID, seq);
addToLog(DateTime.Now, "Send message failed, unknown Type-B address: " + TB, GID, "Service", "Warning");
}
}
if (sent.Count < Alladd.Count())//<--- if there is unsent addresses
{
StringBuilder b = new StringBuilder(block);
foreach (string add in sent)
{
b.Replace(add, "");//<--- remove address that has been sent from the original message and write new msg. to unsent folder
}
if (!Directory.Exists(MsgsPath + "\Unsent"))
{
Directory.CreateDirectory(MsgsPath + "\Unsent");
}
using (StreamWriter w = File.AppendText(MsgsPath + "\Unsent\" + e.Name))
{
w.WriteLine(b);
}
}
sent.Clear();
//---add to dupList to validate the next messages-------------
if (dupList.Count > DupCash)
{
dupList.RemoveAt(0);
}
dupList.Add(block);
//---move msg to archive folder-----------------
if (!Directory.Exists(MsgsPath + "\Archive"))
{
Directory.CreateDirectory(MsgsPath + "\Archive");
}
File.Move(MsgsPath + "\" + e.Name, MsgsPath + "\Archive\" + e.Name);
}
else //<--- if message is a duplicate
{
addToLog(DateTime.Now, "Duplicated message, message not sent", seq, "Service", "Info");
//---move msg to duplicates folder-----------------
if (!Directory.Exists(MsgsPath + "\Duplicates"))
{
Directory.CreateDirectory(MsgsPath + "\Duplicates");
}
File.Move(MsgsPath + "\" + e.Name, MsgsPath + "\Duplicates\" + e.Name);
}
}
catch (Exception x)
{
addToLog(DateTime.Now, "Error: " + x.Message, seq, "Service", "Alert");
if (!Directory.Exists(MsgsPath + "\Unsent"))
{
Directory.CreateDirectory(MsgsPath + "\Unsent");
}
//---move msg to Unsent folder-----------------
File.Move(MsgsPath + "\" + e.Name, MsgsPath + "\Unsent\" + e.Name);
}
}
我想将其添加为评论,但它超出了允许的字符数,所以这里。
我在您的代码中注意到的第一件事是,您正在 Created 事件处理程序中执行所有文件处理。这不是一个好的做法,您应该始终让您的 FileSystemWatcher 事件处理程序做最少的工作,因为它可能会导致溢出,这可能就是您所面临的。
相反,最好将工作委托给一个单独的线程。您可以将事件添加到队列中,让后台线程负责处理它们。您可以在事件处理程序中过滤文件,这样您的队列就不会被垃圾填满。
在事件处理程序中使用 Sleep 也被认为是一种不好的做法,因为您将阻止 FileSystemWatcher 事件。
允许的最大缓冲区大小为 64K,这不是推荐的缓冲区大小,除非您正在处理长路径。增加缓冲区大小是昂贵的,因为它来自无法换出到磁盘的非分页内存,因此请保持缓冲区尽可能小。为避免缓冲区溢出,请使用 NotifyFilter and IncludeSubdirectories 属性过滤掉不需要的更改通知。
最后我建议阅读 FileSystemWatcher MSDN article 并在尝试编写代码之前查看几个在线示例,因为 Windows 观察器有些脆弱且容易出错
好的,所以这次我创建了一个带有文件观察器的服务来处理创建后的文件。
当正在处理的文件达到 1000 时(我正在接收大量消息),我的服务似乎崩溃了。
这是我的逻辑:文件进来,文件观察者读取文本发送到电子邮件,插入数据库,将原始消息移动到文件夹。
在服务启动时,我在开始观看之前先处理待处理的消息(我说的是超过 1000 个待处理的文本文件)并且我的服务需要大约一秒钟来处理每个文件。
一切正常,但是当传入文件总数达到 1000 时,它就崩溃了。 有时服务会停止处理挂起,只开始寻找新文件。
我有 "InternalBufferSize = 64000" 建议的最大值。
请帮助我的代码(我知道它应该是多线程的以便更好地处理,但我不是那个专家):
protected override void OnStart(string[] args)
{
using(TREEEntities TEX = new TREEEntities())
{
var mp= TEX.TREE_settings.FirstOrDefault(x=>x.SET_key =="MSGDump");
MsgsPath = mp.SET_value;
var dc = TEX.TREE_settings.FirstOrDefault(x => x.SET_key == "DupCash");
DupCash = Convert.ToInt16(dc.SET_value);
}
if (Directory.Exists(MsgsPath))
{
if (!Directory.Exists(MsgsPath+"\Archive"))
{
Directory.CreateDirectory(MsgsPath+"\Archive");
}
if (!Directory.Exists(MsgsPath + "\Duplicates"))
{
Directory.CreateDirectory(MsgsPath + "\Duplicates");
}
if (!Directory.Exists(MsgsPath + "\Unsent"))
{
Directory.CreateDirectory(MsgsPath + "\Unsent");
}
}
else
{
Directory.CreateDirectory(MsgsPath);
Directory.CreateDirectory(MsgsPath + "\Archive");
Directory.CreateDirectory(MsgsPath + "\Duplicates");
Directory.CreateDirectory(MsgsPath + "\Unsent");
}
processPending();//<--- process pending files after last service stop
fileSystemWatcher1.Path = MsgsPath;//<--- path to be watched
fileSystemWatcher1.EnableRaisingEvents = true;
fileSystemWatcher1.InternalBufferSize = 64000;
addToLog(DateTime.Now, "Service Started", 0, "Service", "Info");
addToLog(DateTime.Now, "File Watcher Started", 0, "Service", "Info");
//dupList.Clear();//<--- clear duplicates validation list
}
protected override void OnStop()
{
fileSystemWatcher1.EnableRaisingEvents = false;
addToLog(DateTime.Now, "File Watcher Stopped", 0, "Service", "Alert");
addToLog(DateTime.Now, "Service Stopped", 0, "Service", "Alert");
}
private void fileSystemWatcher1_Created(object sender, FileSystemEventArgs e)
{
try
{
//---------read from file------------
Thread.Sleep(200);//<---give the file some time to get released
string block;
using (StreamReader sr = File.OpenText(MsgsPath + "\" + e.Name))
{
block = sr.ReadToEnd();
}
PRT = block.Substring(block.Length - 6, 6);//<--- get the printer name
seq = Convert.ToInt16(block.Substring(block.Length - 20, 20).Substring(0, 4));//<--- get the sequence number
switch (PRT)//<----track sequence number from the 3 printers
{
case "64261B"://<---prt1
int seqPlus1=0;
if(seqPrt1 == 9999)//<---ignore sequence change from 9999 to 1
{ seqPlus1 = 1; }
else { seqPlus1 = seqPrt1 + 1; }
if (seq != seqPlus1 && seqPrt1 != 0)//<---"0" to avoid first service start
{
int x = seq - seqPrt1 - 1;
for (int i = 1; i <= x; i++)
{
addToMissing(PRT, seqPlus1);
addToLog(DateTime.Now, "Missing Sequence Number On Printer: " + PRT + " - " + seqPlus1, seqPlus1, "Service", "Missing");
seqPlus1++;
}
seqPrt1 = seq;
}
else { seqPrt1 = seq; }
break;
case "24E9AA"://<---prt2
int seqPlus2=0;
if(seqPrt2 == 9999)
{ seqPlus2 = 1; }
if (seq != seqPlus2 && seqPrt2 != 0)
{
int x = seq - seqPrt2 - 1;
for (int i = 1; i <= x; i++)
{
addToMissing(PRT, seqPlus2);
addToLog(DateTime.Now, "Missing Sequence Number On Printer: " + PRT + " - " + seqPlus2, seqPlus2, "Service", "Missing");
seqPlus2++;
}
seqPrt2 = seq;
}
else { seqPrt2 = seq; }
break;
case "642602"://<---prt3
int seqPlus3=0;
if(seqPrt3 == 9999)
{ seqPlus3 = 1; }
if (seq != seqPlus3 && seqPrt3 != 0)
{
int x = seq - seqPrt3 - 1;
for (int i = 1; i <= x; i++)
{
addToMissing(PRT, seqPlus3);
addToLog(DateTime.Now, "Missing Sequence Number On Printer: " + PRT + " - " + seqPlus3, seqPlus3, "Service", "Missing");
seqPlus3++;
}
seqPrt3 = seq;
}
else { seqPrt3 = seq; }
break;
}
block = block.Remove(block.Length - 52);//<--- trim the sequence number and unwanted info
string[] Alladd;
List<string> sent = new List<string>();
if (!dupList.Contains(block)) //<--- if msg not found in duplicates validation list
{
//--------extract values--------------
if (block.Substring(0, 3) == "\r\nQ") //<--- if the msg. contains a priority code
{
Alladd = block.Substring(0, block.IndexOf(".")).Replace("\r\n", " ").Substring(4).Split(' ').Distinct().Where(x => !string.IsNullOrEmpty(x)).ToArray(); ;
}
else//<--- if no priority code
{
Alladd = block.Substring(0, block.IndexOf(".")).Replace("\r\n", " ").Substring(1).Split(' ').Distinct().Where(x => !string.IsNullOrEmpty(x)).ToArray(); ;
}
string From = block.Substring(block.IndexOf('.') + 1).Substring(0, 7);
string Msg = block.Substring(block.IndexOf('.') + 1);
Msg = Msg.Substring(Msg.IndexOf('\n') + 1);
//--------add msg content to the DB group table--------
using (TREEEntities TE1 = new TREEEntities())
{
TREE_group tg = new TREE_group()
{
GROUP_original = block,
GROUP_sent = Msg,
GROUP_dateTime = DateTime.Now,
GROUP_from = From,
GROUP_seq = seq,
GROUP_prt = PRT,
};
TE1.AddToTREE_group(tg);
TE1.SaveChanges();
GID = tg.GROUP_ID;
}
//--------validate addresses---------------
foreach (string TB in Alladd)
{
string email = "";
string typeB = "";
TREEEntities TE = new TREEEntities();
var q1 = from x in TE.TREE_users where x.USR_TypeB == TB && x.USR_flag == "act" select new { x.USR_email, x.USR_TypeB };
foreach (var itm in q1)
{
email = itm.USR_email;
typeB = itm.USR_TypeB;
}
//-------send mail if the user exist----
if (TB == typeB)
{
if (typeB == "BAHMVGF")
{
addToFtl(block);
}
try
{
sendMail SM = new sendMail();
SM.SendMail(Msg, "Message from: " + From, email);
//---save record in DB----
addToMsg(typeB, email,"sent","act",1,GID,seq);
sent.Add(typeB);
}
catch (Exception x)
{
addToMsg(typeB, email, "Failed", "act", 1, GID, seq);
addToLog(DateTime.Now, "Send message failed: " + x.Message, GID, "Service", "Warning");
}
}
//-------if no user exist----
else
{
if (TB == "BAHMVGF")
{
addToFtl(block);
}
addToMsg(TB, "No email", "Failed", "act", 1, GID, seq);
addToLog(DateTime.Now, "Send message failed, unknown Type-B address: " + TB, GID, "Service", "Warning");
}
}
if (sent.Count < Alladd.Count())//<--- if there is unsent addresses
{
StringBuilder b = new StringBuilder(block);
foreach (string add in sent)
{
b.Replace(add, "");//<--- remove address that has been sent from the original message and write new msg. to unsent folder
}
if (!Directory.Exists(MsgsPath + "\Unsent"))
{
Directory.CreateDirectory(MsgsPath + "\Unsent");
}
using (StreamWriter w = File.AppendText(MsgsPath + "\Unsent\" + e.Name))
{
w.WriteLine(b);
}
}
sent.Clear();
//---add to dupList to validate the next messages-------------
if (dupList.Count > DupCash)
{
dupList.RemoveAt(0);
}
dupList.Add(block);
//---move msg to archive folder-----------------
if (!Directory.Exists(MsgsPath + "\Archive"))
{
Directory.CreateDirectory(MsgsPath + "\Archive");
}
File.Move(MsgsPath + "\" + e.Name, MsgsPath + "\Archive\" + e.Name);
}
else //<--- if message is a duplicate
{
addToLog(DateTime.Now, "Duplicated message, message not sent", seq, "Service", "Info");
//---move msg to duplicates folder-----------------
if (!Directory.Exists(MsgsPath + "\Duplicates"))
{
Directory.CreateDirectory(MsgsPath + "\Duplicates");
}
File.Move(MsgsPath + "\" + e.Name, MsgsPath + "\Duplicates\" + e.Name);
}
}
catch (Exception x)
{
addToLog(DateTime.Now, "Error: " + x.Message, seq, "Service", "Alert");
if (!Directory.Exists(MsgsPath + "\Unsent"))
{
Directory.CreateDirectory(MsgsPath + "\Unsent");
}
//---move msg to Unsent folder-----------------
File.Move(MsgsPath + "\" + e.Name, MsgsPath + "\Unsent\" + e.Name);
}
}
我想将其添加为评论,但它超出了允许的字符数,所以这里。
我在您的代码中注意到的第一件事是,您正在 Created 事件处理程序中执行所有文件处理。这不是一个好的做法,您应该始终让您的 FileSystemWatcher 事件处理程序做最少的工作,因为它可能会导致溢出,这可能就是您所面临的。 相反,最好将工作委托给一个单独的线程。您可以将事件添加到队列中,让后台线程负责处理它们。您可以在事件处理程序中过滤文件,这样您的队列就不会被垃圾填满。
在事件处理程序中使用 Sleep 也被认为是一种不好的做法,因为您将阻止 FileSystemWatcher 事件。
允许的最大缓冲区大小为 64K,这不是推荐的缓冲区大小,除非您正在处理长路径。增加缓冲区大小是昂贵的,因为它来自无法换出到磁盘的非分页内存,因此请保持缓冲区尽可能小。为避免缓冲区溢出,请使用 NotifyFilter and IncludeSubdirectories 属性过滤掉不需要的更改通知。
最后我建议阅读 FileSystemWatcher MSDN article 并在尝试编写代码之前查看几个在线示例,因为 Windows 观察器有些脆弱且容易出错