对 concurrentQueue 的误解,单个消费者在自己的线程上从队列中工作
misunderstanding of concurrentQueue, a single consumer working from the queue on it's own thread
我在创建一个正常运行的 SystemFileWatcher 时遇到了问题,它获取创建的事件并将其存储在队列中以供单独的线程使用。我在这里阅读了无数关于这个问题的话题,但我无法理解这个特定问题。
using System;
using System.IO;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;
namespace FileSystemWatcherTest
{
class Program
{
public static BlockingCollection<string> processCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
static void Main(string[] args)
{
string path = @"C:\test\";
FileSystemWatcher watcher = new FileSystemWatcher();
watcher.Path = path;
watcher.EnableRaisingEvents = true;
watcher.Filter = "*.*";
watcher.Created += new FileSystemEventHandler(onCreated);
Thread Consumer = new Thread(new ThreadStart(mover));
Consumer.Start();
while (true) ;//run infinite loop so program doesn't terminate untill we force it.
}
static void onCreated(object sender, FileSystemEventArgs e)
{
processCollection.Add(e.FullPath);
}
static void mover()
{
string current;
string processed = @"C:\test\processed\";
while (true)
{
while (processCollection.IsCompleted)
{
Thread.Sleep(1000);
}
while (processCollection.TryTake(out current))
{
System.IO.File.Move(current, processed);
}
}
}
}
}
这就是我要测试的。我知道这是行不通的。当我将文件放入队列时,我只是简单地写入控制台,我已经验证了 FSW 的工作原理。当我尝试在它自己的线程中启动移动函数时,我的问题就开始了。一旦我开始处理队列,移动函数和 onCreated 似乎就不会通信。
我对这段代码的期望是在它自己的线程中启动移动函数,并 运行 它与 SFW 一起启动。我的期望是附加到 blockingcollection 的并发队列自动更新(我通过 onCreated 将一个项目排入队列,移动者看到它现在对该队列有 +1。移动者从队列中取出一个,onCreated 看到这个。)我是可能错误地使用了 Thread.Sleep。我不再有使用 blockingcollection 的支持理由(我最初选择它来处理等待队列填满,并且基本上不断检查队列中是否有要处理的项目)并且愿意将其更改为任何可能有效的方法。我见过锁的使用,但据我了解,由于 concurrentQueue 的同步方式,这并不是真正必要的。
最终目标是处理随机进入的大量小文件,并且在任何给定时间范围从 1 个到数百个不等。这些文件是 .EML。
如果可能的话,我将不胜感激解释正在发生的事情以及解决此问题的建议。我谦虚地来到这里,希望被告知我所理解的一切都是错误的!
编辑:我正在将其作为控制台应用程序进行测试,但之后它将用作服务。我添加了 while (true) ;在 onCreated() 之前保持 FSW 运行ning.
您的代码示例中存在几个不同的问题:
- 您误用了
File.Move()
方法。它要求两个参数都是完整文件 name。您将目录名称作为第二个参数传递,这是不正确的。
- 您正在检查集合的
IsCompleted
属性,好像这会有用。它总是 false
,因此该代码块什么都不做。这就引出了下一个问题……
- 您的线程 运行 处于紧密循环中,消耗了大量 CPU 时间。这可能会或可能不会导致错误,但它可能......
FileSystemWatcher
实际上并不能保证总是报告更改,它可能不会的原因之一是如果它没有足够的 CPU 时间来监视文件系统。如果您用完所有 CPU 时间来饿死它,您可能会发现它根本不报告更改。请注意,此问题也存在于您的主线程中;它也是 运行 一个紧密的循环,消耗大量的 CPU 时间无所事事。所以你完全占据了你系统的两个核心。
- 您未能利用
BlockingCollection
设计的 producer/consumer 执行模型。您应该让您的工作线程枚举 GetConsumingEnumerable()
返回的枚举,使用 CompleteAdding()
方法向该线程发出没有更多工作的信号。
这是您的代码示例的一个版本,它纠正了上述错误,并对示例进行了一些清理,使其更加独立:
// The default backing collection for BlockingCollection<T>
// is ConcurrentQueue<T>. There's no need to specify that
// explicitly.
public static BlockingCollection<string> processCollection = new BlockingCollection<string>();
static void Main(string[] args)
{
string testDirectory = Path.Combine(Environment.CurrentDirectory, "test");
Console.WriteLine("Creating directory: \"{0}\"", testDirectory);
Directory.CreateDirectory(testDirectory);
FileSystemWatcher watcher = new FileSystemWatcher();
watcher.Path = testDirectory;
watcher.EnableRaisingEvents = true;
watcher.Filter = "*.*";
watcher.Created += new FileSystemEventHandler(onCreated);
Thread Consumer = new Thread(new ParameterizedThreadStart(mover));
Consumer.Start(testDirectory);
string text;
while ((text = Console.ReadLine()) != "")
{
string newFile = Path.Combine(testDirectory, text + ".txt");
File.WriteAllText(newFile, "Test file");
}
processCollection.CompleteAdding();
}
static void onCreated(object sender, FileSystemEventArgs e)
{
if (e.ChangeType == WatcherChangeTypes.Created)
{
processCollection.Add(e.FullPath);
}
}
static void mover(object testDirectory)
{
string processed = Path.Combine((string)testDirectory, "processed");
Console.WriteLine("Creating directory: \"{0}\"", processed);
Directory.CreateDirectory(processed);
foreach (string current in processCollection.GetConsumingEnumerable())
{
// Ensure that the file is in fact a file and not something else.
if (File.Exists(current))
{
System.IO.File.Move(current, Path.Combine(processed, Path.GetFileName(current)));
}
}
}
我在创建一个正常运行的 SystemFileWatcher 时遇到了问题,它获取创建的事件并将其存储在队列中以供单独的线程使用。我在这里阅读了无数关于这个问题的话题,但我无法理解这个特定问题。
using System;
using System.IO;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;
namespace FileSystemWatcherTest
{
class Program
{
public static BlockingCollection<string> processCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
static void Main(string[] args)
{
string path = @"C:\test\";
FileSystemWatcher watcher = new FileSystemWatcher();
watcher.Path = path;
watcher.EnableRaisingEvents = true;
watcher.Filter = "*.*";
watcher.Created += new FileSystemEventHandler(onCreated);
Thread Consumer = new Thread(new ThreadStart(mover));
Consumer.Start();
while (true) ;//run infinite loop so program doesn't terminate untill we force it.
}
static void onCreated(object sender, FileSystemEventArgs e)
{
processCollection.Add(e.FullPath);
}
static void mover()
{
string current;
string processed = @"C:\test\processed\";
while (true)
{
while (processCollection.IsCompleted)
{
Thread.Sleep(1000);
}
while (processCollection.TryTake(out current))
{
System.IO.File.Move(current, processed);
}
}
}
}
}
这就是我要测试的。我知道这是行不通的。当我将文件放入队列时,我只是简单地写入控制台,我已经验证了 FSW 的工作原理。当我尝试在它自己的线程中启动移动函数时,我的问题就开始了。一旦我开始处理队列,移动函数和 onCreated 似乎就不会通信。
我对这段代码的期望是在它自己的线程中启动移动函数,并 运行 它与 SFW 一起启动。我的期望是附加到 blockingcollection 的并发队列自动更新(我通过 onCreated 将一个项目排入队列,移动者看到它现在对该队列有 +1。移动者从队列中取出一个,onCreated 看到这个。)我是可能错误地使用了 Thread.Sleep。我不再有使用 blockingcollection 的支持理由(我最初选择它来处理等待队列填满,并且基本上不断检查队列中是否有要处理的项目)并且愿意将其更改为任何可能有效的方法。我见过锁的使用,但据我了解,由于 concurrentQueue 的同步方式,这并不是真正必要的。
最终目标是处理随机进入的大量小文件,并且在任何给定时间范围从 1 个到数百个不等。这些文件是 .EML。
如果可能的话,我将不胜感激解释正在发生的事情以及解决此问题的建议。我谦虚地来到这里,希望被告知我所理解的一切都是错误的!
编辑:我正在将其作为控制台应用程序进行测试,但之后它将用作服务。我添加了 while (true) ;在 onCreated() 之前保持 FSW 运行ning.
您的代码示例中存在几个不同的问题:
- 您误用了
File.Move()
方法。它要求两个参数都是完整文件 name。您将目录名称作为第二个参数传递,这是不正确的。 - 您正在检查集合的
IsCompleted
属性,好像这会有用。它总是false
,因此该代码块什么都不做。这就引出了下一个问题…… - 您的线程 运行 处于紧密循环中,消耗了大量 CPU 时间。这可能会或可能不会导致错误,但它可能......
FileSystemWatcher
实际上并不能保证总是报告更改,它可能不会的原因之一是如果它没有足够的 CPU 时间来监视文件系统。如果您用完所有 CPU 时间来饿死它,您可能会发现它根本不报告更改。请注意,此问题也存在于您的主线程中;它也是 运行 一个紧密的循环,消耗大量的 CPU 时间无所事事。所以你完全占据了你系统的两个核心。 - 您未能利用
BlockingCollection
设计的 producer/consumer 执行模型。您应该让您的工作线程枚举GetConsumingEnumerable()
返回的枚举,使用CompleteAdding()
方法向该线程发出没有更多工作的信号。
这是您的代码示例的一个版本,它纠正了上述错误,并对示例进行了一些清理,使其更加独立:
// The default backing collection for BlockingCollection<T>
// is ConcurrentQueue<T>. There's no need to specify that
// explicitly.
public static BlockingCollection<string> processCollection = new BlockingCollection<string>();
static void Main(string[] args)
{
string testDirectory = Path.Combine(Environment.CurrentDirectory, "test");
Console.WriteLine("Creating directory: \"{0}\"", testDirectory);
Directory.CreateDirectory(testDirectory);
FileSystemWatcher watcher = new FileSystemWatcher();
watcher.Path = testDirectory;
watcher.EnableRaisingEvents = true;
watcher.Filter = "*.*";
watcher.Created += new FileSystemEventHandler(onCreated);
Thread Consumer = new Thread(new ParameterizedThreadStart(mover));
Consumer.Start(testDirectory);
string text;
while ((text = Console.ReadLine()) != "")
{
string newFile = Path.Combine(testDirectory, text + ".txt");
File.WriteAllText(newFile, "Test file");
}
processCollection.CompleteAdding();
}
static void onCreated(object sender, FileSystemEventArgs e)
{
if (e.ChangeType == WatcherChangeTypes.Created)
{
processCollection.Add(e.FullPath);
}
}
static void mover(object testDirectory)
{
string processed = Path.Combine((string)testDirectory, "processed");
Console.WriteLine("Creating directory: \"{0}\"", processed);
Directory.CreateDirectory(processed);
foreach (string current in processCollection.GetConsumingEnumerable())
{
// Ensure that the file is in fact a file and not something else.
if (File.Exists(current))
{
System.IO.File.Move(current, Path.Combine(processed, Path.GetFileName(current)));
}
}
}