如何改进这个 Rx FolderWatcher?

How to improve this Rx FolderWatcher?

我从这里提供的示例开始我的示例:http://www.jaylee.org/post/2012/08/26/An-update-to-matthieumezil-Rx-and-the-FileSystemWatcher.aspx 但问题是,如果您一直在观看一个包含许多文件的文件夹 modified/created/deleted,它永远不会 return 一个事件,因为油门永远不会停止。

我需要的是为每个被更改的文件创建一个新的临时流,并且只发送最后一个事件(如果是删除事件,我们不通知)。

我认为我的示例工作正常,但由于这是我第一次使用 Rx,我想就如何简化它获得一些反馈。

这里是:

public class FileWatcher
{
    public class FileChangedEvent
    {
        public string FullPath { get; private set; }
        public bool IsFileDeleted { get; private set; }

        public FileChangedEvent(string path, bool isFileDeleted = false)
        {
            FullPath = path;
            IsFileDeleted = isFileDeleted;
        }
    }

    public static IObservable<FileChangedEvent> ObserveFolderChanges(string path, string filter, TimeSpan throttle)
    {
        return Observable.Create<FileChangedEvent>(
            observer =>
            {
                var fileSystemWatcher = new FileSystemWatcher(path, filter) { EnableRaisingEvents = true };

                var sources = CreateSources(fileSystemWatcher);

                var fileSources = new ConcurrentDictionary<string, Subject<FileChangedEvent>>();
                return sources.Merge().Subscribe(fileChange =>
                {
                    Subject<FileChangedEvent> fileSubject = fileSources.GetOrAdd(fileChange.FullPath, (key) =>
                    {
                        //Create a new stream for this file.
                        var addedFileSubject = new Subject<FileChangedEvent>();
                        addedFileSubject.Throttle(throttle).Subscribe(lastFileChange =>
                        {
                            if (lastFileChange != null)
                            {
                                Subject<FileChangedEvent> dummy;
                                fileSources.TryRemove(lastFileChange.FullPath, out dummy);

                                //Only send the event if the file was not deleted.
                                if (!lastFileChange.IsFileDeleted)
                                {
                                    observer.OnNext(lastFileChange);
                                }
                            }
                        });
                        return addedFileSubject;
                    });

                    fileSubject.OnNext(fileChange);
                });
            }
        );
    }

    private static IObservable<FileChangedEvent>[] CreateSources(FileSystemWatcher fileWatcher)
    {
        return new[] 
        { 
            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Created += handler, handler => fileWatcher.Created -= handler)
                        .Select(ev => new FileChangedEvent(ev.EventArgs.FullPath)), 

            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Deleted += handler, handler => fileWatcher.Deleted -= handler)
                        .Select(ev => new FileChangedEvent(ev.EventArgs.FullPath, true)),

            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Changed += handler, handler => fileWatcher.Changed -= handler)
                        .Select(ev => new FileChangedEvent(ev.EventArgs.FullPath)),

            //The rename source needs to send a delete event for the old file name.
            Observable.Create<FileChangedEvent>(nameChangedObserver =>
            {
                return Observable.FromEventPattern<RenamedEventHandler, RenamedEventArgs>(handler => fileWatcher.Renamed += handler, handler => fileWatcher.Renamed -= handler)
                    .Subscribe(ev =>
                    {
                        nameChangedObserver.OnNext(new FileChangedEvent(ev.EventArgs.FullPath));
                        nameChangedObserver.OnNext(new FileChangedEvent(ev.EventArgs.OldFullPath, true));
                    });
            }),

            Observable.FromEventPattern<ErrorEventHandler, ErrorEventArgs >(handler => fileWatcher.Error += handler, handler => fileWatcher.Error -= handler)
                        .SelectMany(ev => Observable.Throw<FileChangedEvent>(ev.EventArgs.GetException()))
        };
    }

    static void Main(string[] args)
    {
        var fileWatcher =
            FileWatcher.ObserveFolderChanges("Test Path Here", "*.*", TimeSpan.FromSeconds(30))
                        .Subscribe(fce => { if(fce != null) Console.WriteLine("Changed :" + fce.FullPath); }, e => Debug.WriteLine(e));

        Console.ReadLine();
    }
}

此用例是在文件路径上的文件上传完成时收到通知。

所以,我的问题:
1) 是否可以简化这个?
2) 我是不是漏了什么东西?
3) 与没有 Rx 的情况下这样做相比,性能有很大的损失吗?

谢谢!

是的,你泄露了 FileSystemWatcher 是的,它也可以被简化以消除所有与字典和主题相关的杂乱无章。

使用Observable.Using 管理观察者的生命周期。并使用 GroupBy 摆脱主题字典:

public static IObservable<FileChangedEvent> ObserveFolderChanges(string path, string filter, TimeSpan throttle)
{
    return Observable.Using(
        () => new FileSystemWatcher(path, filter) { EnableRaisingEvents = true },
        fileSystemWatcher => CreateSources(fileSystemWatcher)
            .Merge()
            .GroupBy(c => c.FullPath)
            .SelectMany(fileEvents => fileEvents
                .Throttle(throttle)
                .Where(e => !e.IsFileDeleted)));
}