C# 锁定线程问题
C# Lock Threading Issue
这里的任务很简单(或者我认为...)。我想用要执行的方法填充一个队列(所有这些都会 return 一个对象结果),然后我想从这个队列中拉出一些任意数量的线程,执行这些方法,然后添加结果到其他一些集合(在本例中为字典),当所有工作完成后,它将被 returned。将在主线程中调用一个 main 方法,该方法将开始处理并应该阻塞,直到所有线程完成他们正在做的任何事情并且 return 收集结果。所以我把这个 class:
public class BackgroundWorkManager
{
public delegate object ThreadTask();
private Thread[] workers;
private ManualResetEvent workerThreadMre;
private ManualResetEvent mainThreadMre;
private Queue<WorkItem> workQueue;
private Dictionary<string, object> results;
private object writeLock;
private int activeTasks;
private struct WorkItem
{
public string name;
public ThreadTask task;
public WorkItem(string name, ThreadTask task)
{
this.name = name;
this.task = task;
}
}
private void workMethod()
{
while (true)
{
workerThreadMre.WaitOne();
WorkItem task;
lock (workQueue)
{
if (workQueue.Count == 0)
{
workerThreadMre.Reset();
continue;
}
task = workQueue.Dequeue();
}
object result = task.task();
lock (writeLock)
{
results.Add(task.name, result);
activeTasks--;
if (activeTasks == 0)
mainThreadMre.Set();
}
}
}
public BackgroundWorkManager()
{
workers = new Thread[Environment.ProcessorCount];
workerThreadMre = new ManualResetEvent(false);
mainThreadMre = new ManualResetEvent(false);
workQueue = new Queue<WorkItem>();
writeLock = new object();
activeTasks = 0;
for (int i = 0; i < Environment.ProcessorCount; i++)
{
workers[i] = new Thread(workMethod);
workers[i].Priority = ThreadPriority.Highest;
workers[i].Start();
}
}
public void addTask(string name, ThreadTask task)
{
workQueue.Enqueue(new WorkItem(name, task));
}
public Dictionary<string, object> process()
{
results = new Dictionary<string, object>();
activeTasks = workQueue.Count;
mainThreadMre.Reset();
workerThreadMre.Set();
mainThreadMre.WaitOne();
workerThreadMre.Reset();
return results;
}
}
如果我使用该对象一次来处理方法队列,这会很好用,但如果我尝试这样的事情
BackgroundWorkManager manager = new BackgroundWorkManager();
for (int i = 0; i < 20; i++)
{
manager.addTask("result1", (BackgroundWorkManager.ThreadTask)delegate
{
return (object)(1);
});
manager.process();
}
东西坏了。我要么陷入僵局,要么遇到异常,说我正在写入结果的字典已经包含密钥(但 Visual Studio 调试器说它是空的)。在 work 方法中添加 'Thread.Sleep(1)' 似乎可以修复它,这很奇怪。这是我第一次使用线程,所以我不确定我是在滥用锁还是什么。如果有人能对我做错了什么提供一些见解,将不胜感激。
关于如何使用生产者-消费者模式,有很多选择。例如,您可以通过使用 ActionBlock<T>
(它是 TPL Dataflow
的一部分)来极大地简化您的代码:
var concurrentDictionary = new ConcurrentDictionary<string, object>();
ActionBlock<Func<object>> actionBlock = new ActionBlock<Func<object>>((func) =>
{
var obj = func();
concurrentDictionary.AddOrUpdate("someKey", obj, (s,o) => o);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism =
Environment.ProcessorCount });
然后 post 你的代表:
foreach (var task in tasks)
{
actionBlock.Post(() => (object) 1);
}
并行版本 class:
List<Func<object>> actions = new List<Func<object>>();
actions.Add(delegate { return (object)(1); });
actions.Add(delegate { return (object)(1); });
actions.Add(delegate { return (object)(1); });
Dictionary<string, object> results = new Dictionary<string,object>();
Parallel.ForEach(actions,(f)=> {
lock (results)
{
results.Add(Guid.NewGuid().ToString(), f());
}
});
这里的任务很简单(或者我认为...)。我想用要执行的方法填充一个队列(所有这些都会 return 一个对象结果),然后我想从这个队列中拉出一些任意数量的线程,执行这些方法,然后添加结果到其他一些集合(在本例中为字典),当所有工作完成后,它将被 returned。将在主线程中调用一个 main 方法,该方法将开始处理并应该阻塞,直到所有线程完成他们正在做的任何事情并且 return 收集结果。所以我把这个 class:
public class BackgroundWorkManager
{
public delegate object ThreadTask();
private Thread[] workers;
private ManualResetEvent workerThreadMre;
private ManualResetEvent mainThreadMre;
private Queue<WorkItem> workQueue;
private Dictionary<string, object> results;
private object writeLock;
private int activeTasks;
private struct WorkItem
{
public string name;
public ThreadTask task;
public WorkItem(string name, ThreadTask task)
{
this.name = name;
this.task = task;
}
}
private void workMethod()
{
while (true)
{
workerThreadMre.WaitOne();
WorkItem task;
lock (workQueue)
{
if (workQueue.Count == 0)
{
workerThreadMre.Reset();
continue;
}
task = workQueue.Dequeue();
}
object result = task.task();
lock (writeLock)
{
results.Add(task.name, result);
activeTasks--;
if (activeTasks == 0)
mainThreadMre.Set();
}
}
}
public BackgroundWorkManager()
{
workers = new Thread[Environment.ProcessorCount];
workerThreadMre = new ManualResetEvent(false);
mainThreadMre = new ManualResetEvent(false);
workQueue = new Queue<WorkItem>();
writeLock = new object();
activeTasks = 0;
for (int i = 0; i < Environment.ProcessorCount; i++)
{
workers[i] = new Thread(workMethod);
workers[i].Priority = ThreadPriority.Highest;
workers[i].Start();
}
}
public void addTask(string name, ThreadTask task)
{
workQueue.Enqueue(new WorkItem(name, task));
}
public Dictionary<string, object> process()
{
results = new Dictionary<string, object>();
activeTasks = workQueue.Count;
mainThreadMre.Reset();
workerThreadMre.Set();
mainThreadMre.WaitOne();
workerThreadMre.Reset();
return results;
}
}
如果我使用该对象一次来处理方法队列,这会很好用,但如果我尝试这样的事情
BackgroundWorkManager manager = new BackgroundWorkManager();
for (int i = 0; i < 20; i++)
{
manager.addTask("result1", (BackgroundWorkManager.ThreadTask)delegate
{
return (object)(1);
});
manager.process();
}
东西坏了。我要么陷入僵局,要么遇到异常,说我正在写入结果的字典已经包含密钥(但 Visual Studio 调试器说它是空的)。在 work 方法中添加 'Thread.Sleep(1)' 似乎可以修复它,这很奇怪。这是我第一次使用线程,所以我不确定我是在滥用锁还是什么。如果有人能对我做错了什么提供一些见解,将不胜感激。
关于如何使用生产者-消费者模式,有很多选择。例如,您可以通过使用 ActionBlock<T>
(它是 TPL Dataflow
的一部分)来极大地简化您的代码:
var concurrentDictionary = new ConcurrentDictionary<string, object>();
ActionBlock<Func<object>> actionBlock = new ActionBlock<Func<object>>((func) =>
{
var obj = func();
concurrentDictionary.AddOrUpdate("someKey", obj, (s,o) => o);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism =
Environment.ProcessorCount });
然后 post 你的代表:
foreach (var task in tasks)
{
actionBlock.Post(() => (object) 1);
}
并行版本 class:
List<Func<object>> actions = new List<Func<object>>();
actions.Add(delegate { return (object)(1); });
actions.Add(delegate { return (object)(1); });
actions.Add(delegate { return (object)(1); });
Dictionary<string, object> results = new Dictionary<string,object>();
Parallel.ForEach(actions,(f)=> {
lock (results)
{
results.Add(Guid.NewGuid().ToString(), f());
}
});