如何改进这个 Rx FolderWatcher?
How to improve this Rx FolderWatcher?
我从这里提供的示例开始我的示例:http://www.jaylee.org/post/2012/08/26/An-update-to-matthieumezil-Rx-and-the-FileSystemWatcher.aspx
但问题是,如果您一直在观看一个包含许多文件的文件夹 modified/created/deleted,它永远不会 return 一个事件,因为油门永远不会停止。
我需要的是为每个被更改的文件创建一个新的临时流,并且只发送最后一个事件(如果是删除事件,我们不通知)。
我认为我的示例工作正常,但由于这是我第一次使用 Rx,我想就如何简化它获得一些反馈。
这里是:
public class FileWatcher
{
public class FileChangedEvent
{
public string FullPath { get; private set; }
public bool IsFileDeleted { get; private set; }
public FileChangedEvent(string path, bool isFileDeleted = false)
{
FullPath = path;
IsFileDeleted = isFileDeleted;
}
}
public static IObservable<FileChangedEvent> ObserveFolderChanges(string path, string filter, TimeSpan throttle)
{
return Observable.Create<FileChangedEvent>(
observer =>
{
var fileSystemWatcher = new FileSystemWatcher(path, filter) { EnableRaisingEvents = true };
var sources = CreateSources(fileSystemWatcher);
var fileSources = new ConcurrentDictionary<string, Subject<FileChangedEvent>>();
return sources.Merge().Subscribe(fileChange =>
{
Subject<FileChangedEvent> fileSubject = fileSources.GetOrAdd(fileChange.FullPath, (key) =>
{
//Create a new stream for this file.
var addedFileSubject = new Subject<FileChangedEvent>();
addedFileSubject.Throttle(throttle).Subscribe(lastFileChange =>
{
if (lastFileChange != null)
{
Subject<FileChangedEvent> dummy;
fileSources.TryRemove(lastFileChange.FullPath, out dummy);
//Only send the event if the file was not deleted.
if (!lastFileChange.IsFileDeleted)
{
observer.OnNext(lastFileChange);
}
}
});
return addedFileSubject;
});
fileSubject.OnNext(fileChange);
});
}
);
}
private static IObservable<FileChangedEvent>[] CreateSources(FileSystemWatcher fileWatcher)
{
return new[]
{
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Created += handler, handler => fileWatcher.Created -= handler)
.Select(ev => new FileChangedEvent(ev.EventArgs.FullPath)),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Deleted += handler, handler => fileWatcher.Deleted -= handler)
.Select(ev => new FileChangedEvent(ev.EventArgs.FullPath, true)),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Changed += handler, handler => fileWatcher.Changed -= handler)
.Select(ev => new FileChangedEvent(ev.EventArgs.FullPath)),
//The rename source needs to send a delete event for the old file name.
Observable.Create<FileChangedEvent>(nameChangedObserver =>
{
return Observable.FromEventPattern<RenamedEventHandler, RenamedEventArgs>(handler => fileWatcher.Renamed += handler, handler => fileWatcher.Renamed -= handler)
.Subscribe(ev =>
{
nameChangedObserver.OnNext(new FileChangedEvent(ev.EventArgs.FullPath));
nameChangedObserver.OnNext(new FileChangedEvent(ev.EventArgs.OldFullPath, true));
});
}),
Observable.FromEventPattern<ErrorEventHandler, ErrorEventArgs >(handler => fileWatcher.Error += handler, handler => fileWatcher.Error -= handler)
.SelectMany(ev => Observable.Throw<FileChangedEvent>(ev.EventArgs.GetException()))
};
}
static void Main(string[] args)
{
var fileWatcher =
FileWatcher.ObserveFolderChanges("Test Path Here", "*.*", TimeSpan.FromSeconds(30))
.Subscribe(fce => { if(fce != null) Console.WriteLine("Changed :" + fce.FullPath); }, e => Debug.WriteLine(e));
Console.ReadLine();
}
}
此用例是在文件路径上的文件上传完成时收到通知。
所以,我的问题:
1) 是否可以简化这个?
2) 我是不是漏了什么东西?
3) 与没有 Rx 的情况下这样做相比,性能有很大的损失吗?
谢谢!
是的,你泄露了 FileSystemWatcher
是的,它也可以被简化以消除所有与字典和主题相关的杂乱无章。
使用Observable.Using
管理观察者的生命周期。并使用 GroupBy
摆脱主题字典:
public static IObservable<FileChangedEvent> ObserveFolderChanges(string path, string filter, TimeSpan throttle)
{
return Observable.Using(
() => new FileSystemWatcher(path, filter) { EnableRaisingEvents = true },
fileSystemWatcher => CreateSources(fileSystemWatcher)
.Merge()
.GroupBy(c => c.FullPath)
.SelectMany(fileEvents => fileEvents
.Throttle(throttle)
.Where(e => !e.IsFileDeleted)));
}
我从这里提供的示例开始我的示例:http://www.jaylee.org/post/2012/08/26/An-update-to-matthieumezil-Rx-and-the-FileSystemWatcher.aspx 但问题是,如果您一直在观看一个包含许多文件的文件夹 modified/created/deleted,它永远不会 return 一个事件,因为油门永远不会停止。
我需要的是为每个被更改的文件创建一个新的临时流,并且只发送最后一个事件(如果是删除事件,我们不通知)。
我认为我的示例工作正常,但由于这是我第一次使用 Rx,我想就如何简化它获得一些反馈。
这里是:
public class FileWatcher
{
public class FileChangedEvent
{
public string FullPath { get; private set; }
public bool IsFileDeleted { get; private set; }
public FileChangedEvent(string path, bool isFileDeleted = false)
{
FullPath = path;
IsFileDeleted = isFileDeleted;
}
}
public static IObservable<FileChangedEvent> ObserveFolderChanges(string path, string filter, TimeSpan throttle)
{
return Observable.Create<FileChangedEvent>(
observer =>
{
var fileSystemWatcher = new FileSystemWatcher(path, filter) { EnableRaisingEvents = true };
var sources = CreateSources(fileSystemWatcher);
var fileSources = new ConcurrentDictionary<string, Subject<FileChangedEvent>>();
return sources.Merge().Subscribe(fileChange =>
{
Subject<FileChangedEvent> fileSubject = fileSources.GetOrAdd(fileChange.FullPath, (key) =>
{
//Create a new stream for this file.
var addedFileSubject = new Subject<FileChangedEvent>();
addedFileSubject.Throttle(throttle).Subscribe(lastFileChange =>
{
if (lastFileChange != null)
{
Subject<FileChangedEvent> dummy;
fileSources.TryRemove(lastFileChange.FullPath, out dummy);
//Only send the event if the file was not deleted.
if (!lastFileChange.IsFileDeleted)
{
observer.OnNext(lastFileChange);
}
}
});
return addedFileSubject;
});
fileSubject.OnNext(fileChange);
});
}
);
}
private static IObservable<FileChangedEvent>[] CreateSources(FileSystemWatcher fileWatcher)
{
return new[]
{
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Created += handler, handler => fileWatcher.Created -= handler)
.Select(ev => new FileChangedEvent(ev.EventArgs.FullPath)),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Deleted += handler, handler => fileWatcher.Deleted -= handler)
.Select(ev => new FileChangedEvent(ev.EventArgs.FullPath, true)),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Changed += handler, handler => fileWatcher.Changed -= handler)
.Select(ev => new FileChangedEvent(ev.EventArgs.FullPath)),
//The rename source needs to send a delete event for the old file name.
Observable.Create<FileChangedEvent>(nameChangedObserver =>
{
return Observable.FromEventPattern<RenamedEventHandler, RenamedEventArgs>(handler => fileWatcher.Renamed += handler, handler => fileWatcher.Renamed -= handler)
.Subscribe(ev =>
{
nameChangedObserver.OnNext(new FileChangedEvent(ev.EventArgs.FullPath));
nameChangedObserver.OnNext(new FileChangedEvent(ev.EventArgs.OldFullPath, true));
});
}),
Observable.FromEventPattern<ErrorEventHandler, ErrorEventArgs >(handler => fileWatcher.Error += handler, handler => fileWatcher.Error -= handler)
.SelectMany(ev => Observable.Throw<FileChangedEvent>(ev.EventArgs.GetException()))
};
}
static void Main(string[] args)
{
var fileWatcher =
FileWatcher.ObserveFolderChanges("Test Path Here", "*.*", TimeSpan.FromSeconds(30))
.Subscribe(fce => { if(fce != null) Console.WriteLine("Changed :" + fce.FullPath); }, e => Debug.WriteLine(e));
Console.ReadLine();
}
}
此用例是在文件路径上的文件上传完成时收到通知。
所以,我的问题:
1) 是否可以简化这个?
2) 我是不是漏了什么东西?
3) 与没有 Rx 的情况下这样做相比,性能有很大的损失吗?
谢谢!
是的,你泄露了 FileSystemWatcher
是的,它也可以被简化以消除所有与字典和主题相关的杂乱无章。
使用Observable.Using
管理观察者的生命周期。并使用 GroupBy
摆脱主题字典:
public static IObservable<FileChangedEvent> ObserveFolderChanges(string path, string filter, TimeSpan throttle)
{
return Observable.Using(
() => new FileSystemWatcher(path, filter) { EnableRaisingEvents = true },
fileSystemWatcher => CreateSources(fileSystemWatcher)
.Merge()
.GroupBy(c => c.FullPath)
.SelectMany(fileEvents => fileEvents
.Throttle(throttle)
.Where(e => !e.IsFileDeleted)));
}