Task.ContinueWith 中的嵌套锁 - 安全,还是玩火?
Nested lock in Task.ContinueWith - Safe, or playing with fire?
Windows 服务:从目录列表生成一组 FileWatcher 对象以在配置文件中监视,具有以下要求:
- 文件处理可能很耗时 - 事件必须在它们自己的任务线程上处理
- 保留事件处理程序任务的句柄以等待 OnStop() 事件中的完成。
- 跟踪上传文件的哈希值;不一样就不要再处理
- 保留文件哈希值以允许 OnStart() 处理在服务关闭时上传的文件。
- 切勿多次处理一个文件。
(关于 #3,当没有变化时我们确实会收到事件...最值得注意的是因为 FileWatchers 的重复事件问题)
为了完成这些事情,我有两本字典 - 一本用于上传的文件,另一本用于任务本身。这两个对象都是静态的,我需要在 adding/removing/updating 个文件和任务时锁定它们。简化代码:
public sealed class TrackingFileSystemWatcher : FileSystemWatcher {
private static readonly object fileWatcherDictionaryLock = new object();
private static readonly object runningTaskDictionaryLock = new object();
private readonly Dictionary<int, Task> runningTaskDictionary = new Dictionary<int, Task>(15);
private readonly Dictionary<string, FileSystemWatcherProperties> fileWatcherDictionary = new Dictionary<string, FileSystemWatcherProperties>();
// Wired up elsewhere
private void OnChanged(object sender, FileSystemEventArgs eventArgs) {
this.ProcessModifiedDatafeed(eventArgs);
}
private void ProcessModifiedDatafeed(FileSystemEventArgs eventArgs) {
lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) {
// Read the file and generate hash here
// Properties if the file has been processed before
// ContainsNonNullKey is an extension method
if (this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath)) {
try {
fileProperties = this.fileWatcherDictionary[eventArgs.FullPath];
}
catch (KeyNotFoundException keyNotFoundException) {}
catch (ArgumentNullException argumentNullException) {}
}
else {
// Create a new properties object
}
fileProperties.ChangeType = eventArgs.ChangeType;
fileProperties.FileContentsHash = md5Hash;
fileProperties.LastEventTimestamp = DateTime.Now;
Task task;
try {
task = new Task(() => new DatafeedUploadHandler().UploadDatafeed(this.legalOrg, datafeedFileData), TaskCreationOptions.LongRunning);
}
catch {
..
}
// Only lock long enough to add the task to the dictionary
lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) {
try {
this.runningTaskDictionary.Add(task.Id, task);
}
catch {
..
}
}
try {
task.ContinueWith(t => {
try {
lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) {
this.runningTaskDictionary.Remove(t.Id);
}
// Will this lock burn me?
lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) {
// Persist the file watcher properties to
// disk for recovery at OnStart()
}
}
catch {
..
}
});
task.Start();
}
catch {
..
}
}
}
}
如果委托是在同一对象的锁中定义的,那么在 ContinueWith()
委托中请求对 FileSystemWatcher 集合的锁有什么影响?我希望它很好,即使任务在 ProcessModifiedDatafeed()
释放锁之前开始、完成并进入 ContinueWith() ,任务线程也会被挂起,直到创建线程释放锁。但我想确保我没有踩到任何延迟执行地雷。
看看代码,我也许可以更快地释放锁,避免这个问题,但我还不确定...需要查看完整代码才能确定。
更新
为了阻止不断上升的 "this code is terrible" 评论,我有很好的理由来捕捉我所做的异常,并且我正在捕捉如此多的异常。这是一个带有多线程处理程序的 Windows 服务,它可能不会崩溃。曾经。如果这些线程中的任何一个有未处理的异常,它将执行的操作。
此外,这些例外情况将写入未来的防弹程序。我在下面的评论中给出的例子是为处理程序添加一个工厂......就像今天编写的代码一样,永远不会有空任务,但如果工厂没有正确实现,代码可能会抛出异常.是的,那应该在测试中被发现。但是,我的团队中有初级开发人员... "May. Not. Crash."(此外,如果 是 未处理的异常,它必须正常关闭,允许当前 - 运行要完成的线程 - 我们使用 main() 中设置的未处理异常处理程序来完成)。我们将企业级监视器配置为在事件日志中出现应用程序错误时发送警报——这些异常将记录并标记我们。该方法是经过深思熟虑和讨论的决定。
每个可能的异常都经过仔细考虑并选择属于两个类别之一 - 适用于单个数据馈送且不会关闭服务(大多数)的类别,以及表明明确编程或其他从根本上使代码对所有数据馈送无用的错误。例如,我们选择在无法写入事件日志时关闭服务,因为这是我们指示数据馈送未得到处理的主要机制。异常是在本地捕获的,因为本地上下文是唯一可以做出继续决定的地方。此外,允许异常冒泡到更高级别 (1) 违反了抽象的概念,并且 (2) 在工作线程中没有意义。
我对反对处理异常的人数之多感到惊讶。如果每次 try..catch(Exception){do nothing} 我都得到一毛钱,我看到了,你将在永恒的剩余时间里得到零钱。我会争辩说 1 如果调用 .NET 框架或您自己的代码抛出异常,您需要考虑会导致该异常发生的场景并明确决定应该如何处理。我的代码在 IO 操作中捕获了 UnauthorizedExceptions,因为当我考虑如何发生时,我意识到添加新的数据馈送目录需要向服务帐户授予权限(默认情况下它不会拥有它们)。
我很欣赏建设性的意见...只是请不要用宽泛的 "this sucks" 笔刷来批评简化的示例代码。代码并不糟糕 - 它是防弹的,而且必然如此。
1 如果 Jon Skeet 不同意,我只会争论很长时间
我没有完全遵循此代码的逻辑,但您是否知道可以将任务延续和对 Wait/Result 的调用内联到当前线程?这可能会导致重入。
这个很危险,烧了很多
另外我不太明白你为什么 task
延迟开始。这是一种代码味道。另外,为什么要用 try
包装任务创建?这永远不能抛出。
这显然是一个部分答案。但是代码对我来说看起来很纠结。如果审核它这么难,您可能首先应该以不同的方式编写它。
首先,你的问题:在ContinueWith里面请求锁本身不是问题。如果你打扰你在另一个锁块内这样做 - 那就不要。您的延续将在不同的时间、不同的线程中异步执行。
现在,代码本身是有问题的。为什么要在几乎不能抛出异常的语句周围使用许多 try-catch 块?例如这里:
try {
task = new Task(() => new DatafeedUploadHandler().UploadDatafeed(this.legalOrg, datafeedFileData), TaskCreationOptions.LongRunning);
}
catch {}
你只是创建任务 - 我无法想象这什么时候可以抛出。与 ContinueWith 同样的故事。这里:
this.runningTaskDictionary.Add(task.Id, task);
你可以检查一下这个密钥是否已经存在。但即使这样也不是必需的,因为 task.Id 是您刚刚创建的给定任务实例的唯一 ID。这个:
try {
fileProperties = this.fileWatcherDictionary[eventArgs.FullPath];
}
catch (KeyNotFoundException keyNotFoundException) {}
catch (ArgumentNullException argumentNullException) {}
更糟。你不应该在这方面使用异常 - 不要捕获 KeyNotFoundException 但在 Dictionary 上使用适当的方法(如 TryGetValue)。
因此,首先,删除所有 try catch 块,或者对整个方法使用一个块,或者将它们用于真正可以抛出异常的语句,否则您将无法处理这种情况(并且您知道如何处理异常抛出)。
那么,您处理文件系统事件的方法不是很可扩展和可靠。许多程序在将更改保存到文件时会在很短的时间间隔内生成多个更改事件(也有针对同一文件的多个事件按顺序发生的其他情况)。如果您只是在每个事件上开始处理文件,这可能会导致不同类型的麻烦。因此,您可能需要限制针对给定文件的事件,并且仅在上次检测到更改后的一定延迟后才开始处理。不过,这可能有点高级。
不要忘记尽快获取文件的读锁,这样其他进程就无法在您使用文件时更改文件(例如,您可能计算文件的 md5,然后有人更改文件,然后你开始上传 - 现在你的 md5 无效)。另一种方法是记录上次写入时间和上传时间 - 获取读锁并检查文件是否未更改。
更重要的是一次可以有很多变化。假设我非常快地复制了 1000 个文件 - 你不想用 1000 个线程同时开始上传它们。您需要一个文件队列来处理,并使用多个线程从该队列中取出项目。这样一来,可能会同时发生数千个事件,而您的上传仍将可靠地进行。现在,您为每个更改事件创建新线程,并立即开始上传(根据方法名称)——这将在严重的事件负载下失败(在上述情况下)。
不,它不会烧到你。即使 ContinueWith
内联到当前线程 运行 new Task(() => new DatafeedUploadHandler()..
中,它也会获得锁,例如没有死锁。
lock
语句在内部使用 Monitor
class,它是 reentrant
。例如如果线程已经 got/owns 锁,则它可以多次获取锁。 Multithreading and Locking (Thread-Safe operations)
另一种情况是 task.ContinueWith
在 ProcessModifiedDatafeed
完成之前开始,就像你说的那样。 运行 ContinueWith
的线程只需要等待获得锁。
我真的会考虑在锁外做 task.ContinueWith
和 task.Start()
,如果你看过的话。根据您发布的代码,这是可能的。
您还应该看看 ConcurrentDictionary in the System.Collections.Concurrent
namespace. It would make the code easier and you dont have to manage the locking yourself. You are doing some kind of compare exchange/update here if (this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath))
. e.g. only add if not already in the dictionary. This is one atomic operation. There is no function to do this with a ConcurrentDictionary
but there is an AddOrUpdate 方法。也许您可以使用此方法重写它。根据您的代码,您可以安全地使用 ConcurrentDictionary
至少 runningTaskDictionary
哦,TaskCreationOptions.LongRunning
实际上是为每个任务创建一个新线程,这是一种昂贵的操作。 windows 内部线程池在新的 windows 版本中是智能的,并且正在动态适应。它会 "see" 你正在做很多 IO 的事情,并且会根据需要和实际产生新的线程。
问候
Windows 服务:从目录列表生成一组 FileWatcher 对象以在配置文件中监视,具有以下要求:
- 文件处理可能很耗时 - 事件必须在它们自己的任务线程上处理
- 保留事件处理程序任务的句柄以等待 OnStop() 事件中的完成。
- 跟踪上传文件的哈希值;不一样就不要再处理
- 保留文件哈希值以允许 OnStart() 处理在服务关闭时上传的文件。
- 切勿多次处理一个文件。
(关于 #3,当没有变化时我们确实会收到事件...最值得注意的是因为 FileWatchers 的重复事件问题)
为了完成这些事情,我有两本字典 - 一本用于上传的文件,另一本用于任务本身。这两个对象都是静态的,我需要在 adding/removing/updating 个文件和任务时锁定它们。简化代码:
public sealed class TrackingFileSystemWatcher : FileSystemWatcher {
private static readonly object fileWatcherDictionaryLock = new object();
private static readonly object runningTaskDictionaryLock = new object();
private readonly Dictionary<int, Task> runningTaskDictionary = new Dictionary<int, Task>(15);
private readonly Dictionary<string, FileSystemWatcherProperties> fileWatcherDictionary = new Dictionary<string, FileSystemWatcherProperties>();
// Wired up elsewhere
private void OnChanged(object sender, FileSystemEventArgs eventArgs) {
this.ProcessModifiedDatafeed(eventArgs);
}
private void ProcessModifiedDatafeed(FileSystemEventArgs eventArgs) {
lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) {
// Read the file and generate hash here
// Properties if the file has been processed before
// ContainsNonNullKey is an extension method
if (this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath)) {
try {
fileProperties = this.fileWatcherDictionary[eventArgs.FullPath];
}
catch (KeyNotFoundException keyNotFoundException) {}
catch (ArgumentNullException argumentNullException) {}
}
else {
// Create a new properties object
}
fileProperties.ChangeType = eventArgs.ChangeType;
fileProperties.FileContentsHash = md5Hash;
fileProperties.LastEventTimestamp = DateTime.Now;
Task task;
try {
task = new Task(() => new DatafeedUploadHandler().UploadDatafeed(this.legalOrg, datafeedFileData), TaskCreationOptions.LongRunning);
}
catch {
..
}
// Only lock long enough to add the task to the dictionary
lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) {
try {
this.runningTaskDictionary.Add(task.Id, task);
}
catch {
..
}
}
try {
task.ContinueWith(t => {
try {
lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) {
this.runningTaskDictionary.Remove(t.Id);
}
// Will this lock burn me?
lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) {
// Persist the file watcher properties to
// disk for recovery at OnStart()
}
}
catch {
..
}
});
task.Start();
}
catch {
..
}
}
}
}
如果委托是在同一对象的锁中定义的,那么在 ContinueWith()
委托中请求对 FileSystemWatcher 集合的锁有什么影响?我希望它很好,即使任务在 ProcessModifiedDatafeed()
释放锁之前开始、完成并进入 ContinueWith() ,任务线程也会被挂起,直到创建线程释放锁。但我想确保我没有踩到任何延迟执行地雷。
看看代码,我也许可以更快地释放锁,避免这个问题,但我还不确定...需要查看完整代码才能确定。
更新
为了阻止不断上升的 "this code is terrible" 评论,我有很好的理由来捕捉我所做的异常,并且我正在捕捉如此多的异常。这是一个带有多线程处理程序的 Windows 服务,它可能不会崩溃。曾经。如果这些线程中的任何一个有未处理的异常,它将执行的操作。
此外,这些例外情况将写入未来的防弹程序。我在下面的评论中给出的例子是为处理程序添加一个工厂......就像今天编写的代码一样,永远不会有空任务,但如果工厂没有正确实现,代码可能会抛出异常.是的,那应该在测试中被发现。但是,我的团队中有初级开发人员... "May. Not. Crash."(此外,如果 是 未处理的异常,它必须正常关闭,允许当前 - 运行要完成的线程 - 我们使用 main() 中设置的未处理异常处理程序来完成)。我们将企业级监视器配置为在事件日志中出现应用程序错误时发送警报——这些异常将记录并标记我们。该方法是经过深思熟虑和讨论的决定。
每个可能的异常都经过仔细考虑并选择属于两个类别之一 - 适用于单个数据馈送且不会关闭服务(大多数)的类别,以及表明明确编程或其他从根本上使代码对所有数据馈送无用的错误。例如,我们选择在无法写入事件日志时关闭服务,因为这是我们指示数据馈送未得到处理的主要机制。异常是在本地捕获的,因为本地上下文是唯一可以做出继续决定的地方。此外,允许异常冒泡到更高级别 (1) 违反了抽象的概念,并且 (2) 在工作线程中没有意义。
我对反对处理异常的人数之多感到惊讶。如果每次 try..catch(Exception){do nothing} 我都得到一毛钱,我看到了,你将在永恒的剩余时间里得到零钱。我会争辩说 1 如果调用 .NET 框架或您自己的代码抛出异常,您需要考虑会导致该异常发生的场景并明确决定应该如何处理。我的代码在 IO 操作中捕获了 UnauthorizedExceptions,因为当我考虑如何发生时,我意识到添加新的数据馈送目录需要向服务帐户授予权限(默认情况下它不会拥有它们)。
我很欣赏建设性的意见...只是请不要用宽泛的 "this sucks" 笔刷来批评简化的示例代码。代码并不糟糕 - 它是防弹的,而且必然如此。
1 如果 Jon Skeet 不同意,我只会争论很长时间
我没有完全遵循此代码的逻辑,但您是否知道可以将任务延续和对 Wait/Result 的调用内联到当前线程?这可能会导致重入。
这个很危险,烧了很多
另外我不太明白你为什么 task
延迟开始。这是一种代码味道。另外,为什么要用 try
包装任务创建?这永远不能抛出。
这显然是一个部分答案。但是代码对我来说看起来很纠结。如果审核它这么难,您可能首先应该以不同的方式编写它。
首先,你的问题:在ContinueWith里面请求锁本身不是问题。如果你打扰你在另一个锁块内这样做 - 那就不要。您的延续将在不同的时间、不同的线程中异步执行。
现在,代码本身是有问题的。为什么要在几乎不能抛出异常的语句周围使用许多 try-catch 块?例如这里:
try {
task = new Task(() => new DatafeedUploadHandler().UploadDatafeed(this.legalOrg, datafeedFileData), TaskCreationOptions.LongRunning);
}
catch {}
你只是创建任务 - 我无法想象这什么时候可以抛出。与 ContinueWith 同样的故事。这里:
this.runningTaskDictionary.Add(task.Id, task);
你可以检查一下这个密钥是否已经存在。但即使这样也不是必需的,因为 task.Id 是您刚刚创建的给定任务实例的唯一 ID。这个:
try {
fileProperties = this.fileWatcherDictionary[eventArgs.FullPath];
}
catch (KeyNotFoundException keyNotFoundException) {}
catch (ArgumentNullException argumentNullException) {}
更糟。你不应该在这方面使用异常 - 不要捕获 KeyNotFoundException 但在 Dictionary 上使用适当的方法(如 TryGetValue)。
因此,首先,删除所有 try catch 块,或者对整个方法使用一个块,或者将它们用于真正可以抛出异常的语句,否则您将无法处理这种情况(并且您知道如何处理异常抛出)。
那么,您处理文件系统事件的方法不是很可扩展和可靠。许多程序在将更改保存到文件时会在很短的时间间隔内生成多个更改事件(也有针对同一文件的多个事件按顺序发生的其他情况)。如果您只是在每个事件上开始处理文件,这可能会导致不同类型的麻烦。因此,您可能需要限制针对给定文件的事件,并且仅在上次检测到更改后的一定延迟后才开始处理。不过,这可能有点高级。
不要忘记尽快获取文件的读锁,这样其他进程就无法在您使用文件时更改文件(例如,您可能计算文件的 md5,然后有人更改文件,然后你开始上传 - 现在你的 md5 无效)。另一种方法是记录上次写入时间和上传时间 - 获取读锁并检查文件是否未更改。
更重要的是一次可以有很多变化。假设我非常快地复制了 1000 个文件 - 你不想用 1000 个线程同时开始上传它们。您需要一个文件队列来处理,并使用多个线程从该队列中取出项目。这样一来,可能会同时发生数千个事件,而您的上传仍将可靠地进行。现在,您为每个更改事件创建新线程,并立即开始上传(根据方法名称)——这将在严重的事件负载下失败(在上述情况下)。
不,它不会烧到你。即使 ContinueWith
内联到当前线程 运行 new Task(() => new DatafeedUploadHandler()..
中,它也会获得锁,例如没有死锁。
lock
语句在内部使用 Monitor
class,它是 reentrant
。例如如果线程已经 got/owns 锁,则它可以多次获取锁。 Multithreading and Locking (Thread-Safe operations)
另一种情况是 task.ContinueWith
在 ProcessModifiedDatafeed
完成之前开始,就像你说的那样。 运行 ContinueWith
的线程只需要等待获得锁。
我真的会考虑在锁外做 task.ContinueWith
和 task.Start()
,如果你看过的话。根据您发布的代码,这是可能的。
您还应该看看 ConcurrentDictionary in the System.Collections.Concurrent
namespace. It would make the code easier and you dont have to manage the locking yourself. You are doing some kind of compare exchange/update here if (this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath))
. e.g. only add if not already in the dictionary. This is one atomic operation. There is no function to do this with a ConcurrentDictionary
but there is an AddOrUpdate 方法。也许您可以使用此方法重写它。根据您的代码,您可以安全地使用 ConcurrentDictionary
至少 runningTaskDictionary
哦,TaskCreationOptions.LongRunning
实际上是为每个任务创建一个新线程,这是一种昂贵的操作。 windows 内部线程池在新的 windows 版本中是智能的,并且正在动态适应。它会 "see" 你正在做很多 IO 的事情,并且会根据需要和实际产生新的线程。
问候