对 concurrentQueue 的误解,单个消费者在自己的线程上从队列中工作

misunderstanding of concurrentQueue, a single consumer working from the queue on it's own thread

我在创建一个正常运行的 SystemFileWatcher 时遇到了问题,它获取创建的事件并将其存储在队列中以供单独的线程使用。我在这里阅读了无数关于这个问题的话题,但我无法理解这个特定问题。

using System;
using System.IO;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;

namespace FileSystemWatcherTest
{
    class Program
    {
        public static BlockingCollection<string> processCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());

    static void Main(string[] args)
    {
        string path = @"C:\test\";
        FileSystemWatcher watcher = new FileSystemWatcher();

        watcher.Path = path;
        watcher.EnableRaisingEvents = true;
        watcher.Filter = "*.*";

        watcher.Created += new FileSystemEventHandler(onCreated);
        Thread Consumer = new Thread(new ThreadStart(mover));
        Consumer.Start();


        while (true) ;//run infinite loop so program doesn't terminate untill we force it.
    }
    static void onCreated(object sender, FileSystemEventArgs e)
    {
        processCollection.Add(e.FullPath);     
    }

    static void mover()
    {
        string current;
        string processed = @"C:\test\processed\";
        while (true)
        {
            while (processCollection.IsCompleted)
            {
                Thread.Sleep(1000);
            }
            while (processCollection.TryTake(out current))
            {
                System.IO.File.Move(current, processed);
            }
        }
    }
}

}

这就是我要测试的。我知道这是行不通的。当我将文件放入队列时,我只是简单地写入控制台,我已经验证了 FSW 的工作原理。当我尝试在它自己的线程中启动移动函数时,我的问题就开始了。一旦我开始处理队列,移动函数和 onCreated 似乎就不会通信。

我对这段代码的期望是在它自己的线程中启动移动函数,并 运行 它与 SFW 一起启动。我的期望是附加到 blockingcollection 的并发队列自动更新(我通过 onCreated 将一个项目排入队列,移动者看到它现在对该队列有 +1。移动者从队列中取出一个,onCreated 看到这个。)我是可能错误地使用了 Thread.Sleep。我不再有使用 blockingcollection 的支持理由(我最初选择它来处理等待队列填满,并且基本上不断检查队列中是否有要处理的项目)并且愿意将其更改为任何可能有效的方法。我见过锁的使用,但据我了解,由于 concurrentQueue 的同步方式,这并不是真正必要的。

最终目标是处理随机进入的大量小文件,并且在任何给定时间范围从 1 个到数百个不等。这些文件是 .EML。

如果可能的话,我将不胜感激解释正在发生的事情以及解决此问题的建议。我谦虚地来到这里,希望被告知我所理解的一切都是错误的!

编辑:我正在将其作为控制台应用程序进行测试,但之后它将用作服务。我添加了 while (true) ;在 onCreated() 之前保持 FSW 运行ning.

您的代码示例中存在几个不同的问题:

  1. 您误用了 File.Move() 方法。它要求两个参数都是完整文件 name。您将目录名称作为第二个参数传递,这是不正确的。
  2. 您正在检查集合的 IsCompleted 属性,好像这会有用。它总是 false,因此该代码块什么都不做。这就引出了下一个问题……
  3. 您的线程 运行 处于紧密循环中,消耗了大量 CPU 时间。这可能会或可能不会导致错误,但它可能......FileSystemWatcher 实际上并不能保证总是报告更改,它可能不会的原因之一是如果它没有足够的 CPU 时间来监视文件系统。如果您用完所有 CPU 时间来饿死它,您可能会发现它根本不报告更改。请注意,此问题也存在于您的主线程中;它也是 运行 一个紧密的循环,消耗大量的 CPU 时间无所事事。所以你完全占据了你系统的两个核心。
  4. 您未能利用 BlockingCollection 设计的 producer/consumer 执行模型。您应该让您的工作线程枚举 GetConsumingEnumerable() 返回的枚举,使用 CompleteAdding() 方法向该线程发出没有更多工作的信号。

这是您的代码示例的一个版本,它纠正了上述错误,并对示例进行了一些清理,使其更加独立:

// The default backing collection for BlockingCollection<T>
// is ConcurrentQueue<T>. There's no need to specify that
// explicitly.
public static BlockingCollection<string> processCollection = new BlockingCollection<string>();

static void Main(string[] args)
{
    string testDirectory = Path.Combine(Environment.CurrentDirectory, "test");

    Console.WriteLine("Creating directory: \"{0}\"", testDirectory);
    Directory.CreateDirectory(testDirectory);

    FileSystemWatcher watcher = new FileSystemWatcher();

    watcher.Path = testDirectory;
    watcher.EnableRaisingEvents = true;
    watcher.Filter = "*.*";

    watcher.Created += new FileSystemEventHandler(onCreated);
    Thread Consumer = new Thread(new ParameterizedThreadStart(mover));
    Consumer.Start(testDirectory);

    string text;

    while ((text = Console.ReadLine()) != "")
    {
        string newFile = Path.Combine(testDirectory, text + ".txt");

        File.WriteAllText(newFile, "Test file");
    }

    processCollection.CompleteAdding();
}

static void onCreated(object sender, FileSystemEventArgs e)
{
    if (e.ChangeType == WatcherChangeTypes.Created)
    {
        processCollection.Add(e.FullPath);
    }
}

static void mover(object testDirectory)
{
    string processed = Path.Combine((string)testDirectory, "processed");

    Console.WriteLine("Creating directory: \"{0}\"", processed);

    Directory.CreateDirectory(processed);

    foreach (string current in processCollection.GetConsumingEnumerable())
    {
        // Ensure that the file is in fact a file and not something else.
        if (File.Exists(current))
        {
            System.IO.File.Move(current, Path.Combine(processed, Path.GetFileName(current)));
        }
    }
}