带有文件系统观察器的 NServicebus

NServicebus with File System Watcher

我希望我的端点在检测到特定文件夹中的文件被删除时发出事件。我能够通过使用实现 IWantToRunWhenBusStartsAndStops 的 class 来让它工作,它反过来设置一个 FileSystemWatcher 来监视给定的文件夹。我的问题是,这是使用 nservicebus 解决此问题的最佳方法,还是我遗漏了一些可能给我带来麻烦的东西?

这是我的代码:

public class FileSystem : IWantToRunWhenBusStartsAndStops
{
    private FileSystemWatcher watcher;

    public void Start()
    {
        ConfigFileWatcher();
    }

    public void Stop()
    {

    }

    [PermissionSet(SecurityAction.Demand, Name = "FullTrust")]
    private void ConfigFileWatcher()
    {
        watcher = new FileSystemWatcher();
        watcher.Path = @"c:\";
        /* Watch for changes in LastAccess and LastWrite times, and
           the renaming of files or directories. */
        watcher.NotifyFilter = NotifyFilters.LastAccess | NotifyFilters.LastWrite
           | NotifyFilters.FileName | NotifyFilters.DirectoryName;
        // Only watch text files.
        watcher.Filter = "*.txt";

        // Add event handlers.
        watcher.Changed += new FileSystemEventHandler(OnChanged);
        watcher.Created += new FileSystemEventHandler(OnChanged);
        watcher.Deleted += new FileSystemEventHandler(OnChanged);

        // Begin watching.
        watcher.EnableRaisingEvents = true;
    }

    // Define the event handlers. 
    private static void OnChanged(object source, FileSystemEventArgs e)
    {
        // Specify what is done when a file is changed, created, or deleted.
        Console.WriteLine("File: " + e.FullPath + " " + e.ChangeType);

        // fire off an event here...
    }

}

如果你看一下NServiceBus的源码,在容器初始化的时候,你会看到IWantToRunWhenBusStartsAndStops注册了一个per call的生命周期

ForAllTypes<IWantToRunWhenBusStartsAndStops>(TypesToScan, t => configurer.ConfigureComponent(t, DependencyLifecycle.InstancePerCall));

这意味着 class 将在 Start() 被调用后被释放。您的实施之所以有效,是因为您的事件订阅了一个静态处理程序,它使订阅保持活动状态。

我们在生产中使用文件观察器,但我们将它们烘焙为高级卫星。卫星保证初始化为单例,不会被丢弃。它们也有 Start 和 Stop 方法。它们确实有地址并且应该能够处理传入的消息,但是您可以使用一些虚拟地址并且在处理程序中什么也不做,除非您想让您的文件系统观察器卫星双向(即接收消息并将它们作为文件放在磁盘上)。

在 NServiceBus 中,建议 经常 运行 将非一次性进程作为卫星。许多 NServiceBus 组件被制成卫星。

您可能会对如何自己制作卫星感到好奇,但这很容易做到。您可以检查接口签名here

看起来像这样

using System;
using System.IO;
using NServiceBus;
using NServiceBus.Satellites;

public class FileSystem : ISatellite
{
    private FileSystemWatcher _watcher;

    public bool Handle(TransportMessage message)
    {
        return true;
    }

    public void Start()
    {
        _watcher = new FileSystemWatcher
        {
            Path = @"c:\",
            NotifyFilter = NotifyFilters.LastAccess |    NotifyFilters.LastWrite
                           | NotifyFilters.FileName | NotifyFilters.DirectoryName,
            Filter = "*.txt"
        };
        _watcher.Changed += OnChanged;
        _watcher.Created += OnChanged;
        _watcher.Deleted += OnChanged;
        _watcher.EnableRaisingEvents = true;
    }

    public void Stop()
    {
        _watcher.Dispose();
    }

    public Address InputAddress
    {
        get { return Address.Parse("FileSystemSatellite"); }
    }

    public bool Disabled
    {
        get { return false; }
    }

    // Define the event handlers. 
    private void OnChanged(object source, FileSystemEventArgs e)
    {
        // Specify what is done when a file is changed, created, or    deleted.
        Console.WriteLine("File: " + e.FullPath + " " + e.ChangeType);

        // fire off an event here...
    }
}

您需要记住一件事:每颗卫星都有自己的队列。在这种情况下,它将始终为空。

根据其他回答和评论,我相信您的 FileSystemWatcher 通过订阅 OnChanged 事件而保持活动状态。

这实际上是内存泄漏的(相当)常见原因,但在您的情况下,它实际上将您的观察者保留在内存中并允许它继续运行。从 OnChanged() 方法中删除 static 关键字,我相信您会看到预期的行为。

当然,您希望 FileSystemWatcher 继续运行。 . .一种选择可能是让观察者变量本身成为静态变量。如果你走那条路,我会把它放在一个单独的 class 中,然后通过实现 IWantToRunWhenBusStartsAndStops 的 class 初始化它。