使用 C# 的暂停事件处理
Suspended event handling with C#
我有 2 个 classes:
public class A
{
private const int MAXCOUNTER = 100500;
private Thread m_thrd;
public event Action<string> ItemStarted;
public event Action<string> ItemFinished;
private void OnItemStarted(string name)
{
if (ItemStarted != null) ItemStarted(name);
}
private void OnItemFinished(string name)
{
if (ItemFinished != null) ItemFinished(name);
}
public A()
{
m_thrd = new Thread(this.Run);
m_thrd.Start();
}
private void Run()
{
for (int i = 0; i < MAXCOUNTER; i++)
{
OnItemStarted(i.ToString());
// some long term operations
OnItemFinished(i.ToString());
}
}
}
public class B
{
private Thread m_thrd;
private Queue<string> m_data;
public B()
{
m_thrd = new Thread(this.ProcessData);
m_thrd.Start();
}
public void ItemStartedHandler(string str)
{
m_data.Enqueue(str);
}
public void ItemFinishedHandler(string str)
{
if (m_data.Dequeue() != str)
throw new Exception("dequeued element is not the same as finish one!");
}
private void ProcessData()
{
lock (m_data)
{
while (m_data.Count != 0)
{
var item = m_data.Peek();
//make some long term operations on the item
}
}
}
}
我们还有其他地方的代码
A a = new A();
B b = new B();
a.ItemStarted += b.ItemStartedHandler;
a.ItemFinished += b.ItemFinishedHandler;
- 那么,如果
ItemFinished
在 ProcessData()
仍在工作时引发,会发生什么?
- 我应该使用
AutoResetEvent
之类的东西让 class A
等待 class B
完成 ProcessData
吗?
- 是否有必要在
ProcessData
中使用lock
?
- 可以用
m_thrd = new Thread(this.ProcessData);
调用classB
的线程吗?这里的事情让我感到困惑 - ProcessData
不会在任何 ItemStarted
事件引发之前完成(它不会导致 B
中的线程已经完成而 ItemStarted
是第一次生成)?
您的代码有点混乱。
如果在 ProcessData()
完成之前引发 ItemFinished
,那么您的第一个问题是竞争条件。
不用担心使用 AutoResetEvent
。简单的事情是锁定对 m_data
的每个访问。所以这导致了下一个问题 - 是的,锁是必要的,而且到处都是必要的。
但你的最后一点是最重要的。您需要将每个构造函数更改为 .Start()
方法,以便在开始之前留出时间进行连接。
但是,即便如此,您仍然存在严重的竞争条件。
以下是您应该如何完成这项工作。 NuGet "Rx-Main" 将 Microsoft 的 Reactive Framework 添加到您的代码中。然后这样做:
var scheduler1 = new EventLoopScheduler();
var scheduler2 = new EventLoopScheduler();
Observable
.Range(0, 100500)
.ObserveOn(scheduler1)
.Do(x => { /* some long term operations */ })
.ObserveOn(scheduler2)
.Subscribe(x =>
{
//make some long term operations on the item
});
任务完成。
您需要更正代码中的几个线程问题:
- 您正在启动线程后连接事件。您应该在启动线程之前执行此操作。因此,对于 classes,将构造函数中的任何内容放入 Public 实例函数中。现在创建 classes 的实例,连接事件,然后先调用 class B 的实例方法,然后调用 class A 来启动线程。
- 您正在从两个不同的线程访问 class B 中的队列。队列不是线程安全的,因此您应该使用 ConcurrentQueue (https://msdn.microsoft.com/en-us/library/dd267265(v=vs.110).aspx) 和相应的 Try... 函数。您不需要使用 ConcurrentQueue 进行任何类型的显式锁定。
- 您使用 class B ProcessData 函数的意图不是很清楚。您是想为队列中的每个项目调用 ProcessData 一次,还是想继续调用 ProcessData 直到项目未出队?如果是前者,那么您应该使用带有 GetConsumingEnumerable 的 BlockingCollection 而不是使用 Queue。有关详细信息,请参见:https://msdn.microsoft.com/en-us/library/dd287186(v=vs.110).aspx。如果这是你想要的,我可以 post 一个样本。如果您使用 BlockingCollection,则不会出现 ProcessData 比 class 线程更早完成的情况。
我有 2 个 classes:
public class A
{
private const int MAXCOUNTER = 100500;
private Thread m_thrd;
public event Action<string> ItemStarted;
public event Action<string> ItemFinished;
private void OnItemStarted(string name)
{
if (ItemStarted != null) ItemStarted(name);
}
private void OnItemFinished(string name)
{
if (ItemFinished != null) ItemFinished(name);
}
public A()
{
m_thrd = new Thread(this.Run);
m_thrd.Start();
}
private void Run()
{
for (int i = 0; i < MAXCOUNTER; i++)
{
OnItemStarted(i.ToString());
// some long term operations
OnItemFinished(i.ToString());
}
}
}
public class B
{
private Thread m_thrd;
private Queue<string> m_data;
public B()
{
m_thrd = new Thread(this.ProcessData);
m_thrd.Start();
}
public void ItemStartedHandler(string str)
{
m_data.Enqueue(str);
}
public void ItemFinishedHandler(string str)
{
if (m_data.Dequeue() != str)
throw new Exception("dequeued element is not the same as finish one!");
}
private void ProcessData()
{
lock (m_data)
{
while (m_data.Count != 0)
{
var item = m_data.Peek();
//make some long term operations on the item
}
}
}
}
我们还有其他地方的代码
A a = new A();
B b = new B();
a.ItemStarted += b.ItemStartedHandler;
a.ItemFinished += b.ItemFinishedHandler;
- 那么,如果
ItemFinished
在ProcessData()
仍在工作时引发,会发生什么? - 我应该使用
AutoResetEvent
之类的东西让 classA
等待 classB
完成ProcessData
吗? - 是否有必要在
ProcessData
中使用lock
? - 可以用
m_thrd = new Thread(this.ProcessData);
调用classB
的线程吗?这里的事情让我感到困惑 -ProcessData
不会在任何ItemStarted
事件引发之前完成(它不会导致B
中的线程已经完成而ItemStarted
是第一次生成)?
您的代码有点混乱。
如果在 ProcessData()
完成之前引发 ItemFinished
,那么您的第一个问题是竞争条件。
不用担心使用 AutoResetEvent
。简单的事情是锁定对 m_data
的每个访问。所以这导致了下一个问题 - 是的,锁是必要的,而且到处都是必要的。
但你的最后一点是最重要的。您需要将每个构造函数更改为 .Start()
方法,以便在开始之前留出时间进行连接。
但是,即便如此,您仍然存在严重的竞争条件。
以下是您应该如何完成这项工作。 NuGet "Rx-Main" 将 Microsoft 的 Reactive Framework 添加到您的代码中。然后这样做:
var scheduler1 = new EventLoopScheduler();
var scheduler2 = new EventLoopScheduler();
Observable
.Range(0, 100500)
.ObserveOn(scheduler1)
.Do(x => { /* some long term operations */ })
.ObserveOn(scheduler2)
.Subscribe(x =>
{
//make some long term operations on the item
});
任务完成。
您需要更正代码中的几个线程问题:
- 您正在启动线程后连接事件。您应该在启动线程之前执行此操作。因此,对于 classes,将构造函数中的任何内容放入 Public 实例函数中。现在创建 classes 的实例,连接事件,然后先调用 class B 的实例方法,然后调用 class A 来启动线程。
- 您正在从两个不同的线程访问 class B 中的队列。队列不是线程安全的,因此您应该使用 ConcurrentQueue (https://msdn.microsoft.com/en-us/library/dd267265(v=vs.110).aspx) 和相应的 Try... 函数。您不需要使用 ConcurrentQueue 进行任何类型的显式锁定。
- 您使用 class B ProcessData 函数的意图不是很清楚。您是想为队列中的每个项目调用 ProcessData 一次,还是想继续调用 ProcessData 直到项目未出队?如果是前者,那么您应该使用带有 GetConsumingEnumerable 的 BlockingCollection 而不是使用 Queue。有关详细信息,请参见:https://msdn.microsoft.com/en-us/library/dd287186(v=vs.110).aspx。如果这是你想要的,我可以 post 一个样本。如果您使用 BlockingCollection,则不会出现 ProcessData 比 class 线程更早完成的情况。