将工作推给观察员

push work to observers

我有一个侦听器,它以 IPayload 的形式接收工作。倾听者应该将这项工作推给真正做这项工作的观察者。这是我第一次粗略地尝试实现这一目标:

public interface IObserver
{
    void DoWork(IPayload payload);
}

public interface IObservable
{
    void RegisterObserver(IObserver observer);
    void RemoveObserver(IObserver observer);
    void NotifyObservers(IPayload payload);
}

public class Observer : IObserver
{
    public void DoWork(IPayload payload)
    {
        // do some heavy lifting
    }
}

public class Listener : IObservable
{
    private readonly List<IObserver> _observers = new List<IObserver>();

    public void PushIncomingPayLoad(IPayload payload)
    {
        NotifyObservers(payload);
    }

    public void RegisterObserver(IObserver observer)
    {
        _observers.Add(observer);
    }

    public void RemoveObserver(IObserver observer)
    {
        _observers.Remove(observer);
    }

    public void NotifyObservers(IPayload payload)
    {
        Parallel.ForEach(_observers, observer =>
        {
        observer.DoWork(payload);
        });
    }
}

这是遵循 observer/observable 模式(即 pub sub?)的有效方法吗?我的理解是,NotifyObservers 还会对每个有效载荷产生威胁。这个对吗?非常欢迎任何改进建议。

请注意,所有观察者都必须完成他们的工作,然后才能将有效负载形式的新工作传递给他们 - 'observation' 的顺序无关紧要。基本上,听众必须像主人一样行事,同时尽可能多地使用 TPL 来利用主机的核心。恕我直言,这需要观察者明确注册 listener/Observable.

PS:

我认为 Parallel.ForEach 不会为每个观察者创建一个线程:Why isn't Parallel.ForEach running multiple threads? 如果这是真的,我如何确保为每个观察者创建一个线程?

我想到的替代方案是:

public async void NotifyObservers(IPayload payload)
{
    foreach (var observer in _observers)
    {
    var observer1 = observer;
    await Task.Run(() => observer1.DoWork(payload));
    }
    await Task.WhenAll();
}

当然你可以这样做,但在 .net 中,如果你不想重新发明轮子,则不需要这样做:-) 在 c# 中,这可以使用事件来完成。

一个简短的例子:

  //Your Listener who has a public eventhandler where others can add them as listeners
  public class Listener{
      //your eventhandler where others "add" them as listeners
      public event EventHandler<PayLoadEventsArgs> IncomingPayload;

      //a method where you process new data and want to notify the others
      public void PushIncomingPayLoad(IPayload payload)
      {
          //check if there are any listeners
          if(IncomingPayload != null)
              //if so, then notify them with the data in the PayloadEventArgs
              IncomingPayload(this, new PayloadEventArgs(payload));
      }
  }  

  //Your EventArgs class to hold the data
  public class PayloadEventArgs : EventArgs{

      Payload payload { get; private set; }  

      public PayloadEventArgs(Payload payload){
          this.payload = payload;
      }
  }

  public class Worker{
      //add this instance as a observer
      YourListenerInstance.IncomingPayload += DoWork;

      //remove this instance 
      YourListenerInstance.IncomingPayload -= DoWork;

      //This method gets called when the Listener notifies the  IncomingPayload listeners
      void DoWork(Object sender, PayloadEventArgs e){
          Console.WriteLine(e.payload);
      }
   }

编辑:由于问题要求并行执行,在订阅者端执行新线程怎么样?我认为这是实现这一目标的最简单方法。

//Inside the DoWork method of the subscriber start a new thread
Task.Factory.StartNew( () => 
{
      //Do your work here
});

//If you want to make sure that a new thread is used for the task, then add the TaskCreationOptions.LongRunning parameter
Task.Factory.StartNew( () => 
{
      //Do your work here
}, TaskCreationOptions.LongRunning);

希望这能回答您的问题?如果没有,请发表评论。