使用 C# 的暂停事件处理

Suspended event handling with C#

我有 2 个 classes:

public class A
{
    private const int MAXCOUNTER = 100500;
    private Thread m_thrd;
    public event Action<string> ItemStarted;
    public event Action<string> ItemFinished;

    private void OnItemStarted(string name)
    {
        if (ItemStarted != null) ItemStarted(name);
    }

    private void OnItemFinished(string name)
    {
        if (ItemFinished != null) ItemFinished(name);
    }

    public A()
    {
        m_thrd = new Thread(this.Run);
        m_thrd.Start();
    }

    private void Run()
    {
        for (int i = 0; i < MAXCOUNTER; i++)
        {
            OnItemStarted(i.ToString());
            // some long term operations
            OnItemFinished(i.ToString());
        }
    }
}

public class B
{
    private Thread m_thrd;
    private Queue<string> m_data;
    public B()
    {
        m_thrd = new Thread(this.ProcessData);
        m_thrd.Start();
    }

    public void ItemStartedHandler(string str)
    {
        m_data.Enqueue(str);
    }

    public void ItemFinishedHandler(string str)
    {
        if (m_data.Dequeue() != str)
            throw new Exception("dequeued element is not the same as finish one!");
    }

    private void ProcessData()
    {
        lock (m_data)
        {
            while (m_data.Count != 0)
            {
                var item = m_data.Peek();
                //make some long term operations on the item
            }
        }
    }
}

我们还有其他地方的代码

A a = new A();
B b = new B();
a.ItemStarted += b.ItemStartedHandler;
a.ItemFinished += b.ItemFinishedHandler;

您的代码有点混乱。

如果在 ProcessData() 完成之前引发 ItemFinished,那么您的第一个问题是竞争条件。

不用担心使用 AutoResetEvent。简单的事情是锁定对 m_data 的每个访问。所以这导致了下一个问题 - 是的,锁是必要的,而且到处都是必要的。

但你的最后一点是最重要的。您需要将每个构造函数更改为 .Start() 方法,以便在开始之前留出时间进行连接。

但是,即便如此,您仍然存在严重的竞争条件。

以下是您应该如何完成这项工作。 NuGet "Rx-Main" 将 Microsoft 的 Reactive Framework 添加到您的代码中。然后这样做:

var scheduler1 = new EventLoopScheduler();
var scheduler2 = new EventLoopScheduler();

Observable
    .Range(0, 100500)
    .ObserveOn(scheduler1)
    .Do(x => { /* some long term operations */ })
    .ObserveOn(scheduler2)
    .Subscribe(x =>
    {
        //make some long term operations on the item
    });

任务完成。

您需要更正代码中的几个线程问题:

  1. 您正在启动线程后连接事件。您应该在启动线程之前执行此操作。因此,对于 classes,将构造函数中的任何内容放入 Public 实例函数中。现在创建 classes 的实例,连接事件,然后先调用 class B 的实例方法,然后调用 class A 来启动线程。
  2. 您正在从两个不同的线程访问 class B 中的队列。队列不是线程安全的,因此您应该使用 ConcurrentQueue (https://msdn.microsoft.com/en-us/library/dd267265(v=vs.110).aspx) 和相应的 Try... 函数。您不需要使用 ConcurrentQueue 进行任何类型的显式锁定。
  3. 您使用 class B ProcessData 函数的意图不是很清楚。您是想为队列中的每个项目调用 ProcessData 一次,还是想继续调用 ProcessData 直到项目未出队?如果是前者,那么您应该使用带有 GetConsumingEnumerable 的 BlockingCollection 而不是使用 Queue。有关详细信息,请参见:https://msdn.microsoft.com/en-us/library/dd287186(v=vs.110).aspx。如果这是你想要的,我可以 post 一个样本。如果您使用 BlockingCollection,则不会出现 ProcessData 比 class 线程更早完成的情况。