将工作推给观察员
push work to observers
我有一个侦听器,它以 IPayload 的形式接收工作。倾听者应该将这项工作推给真正做这项工作的观察者。这是我第一次粗略地尝试实现这一目标:
public interface IObserver
{
void DoWork(IPayload payload);
}
public interface IObservable
{
void RegisterObserver(IObserver observer);
void RemoveObserver(IObserver observer);
void NotifyObservers(IPayload payload);
}
public class Observer : IObserver
{
public void DoWork(IPayload payload)
{
// do some heavy lifting
}
}
public class Listener : IObservable
{
private readonly List<IObserver> _observers = new List<IObserver>();
public void PushIncomingPayLoad(IPayload payload)
{
NotifyObservers(payload);
}
public void RegisterObserver(IObserver observer)
{
_observers.Add(observer);
}
public void RemoveObserver(IObserver observer)
{
_observers.Remove(observer);
}
public void NotifyObservers(IPayload payload)
{
Parallel.ForEach(_observers, observer =>
{
observer.DoWork(payload);
});
}
}
这是遵循 observer/observable 模式(即 pub sub?)的有效方法吗?我的理解是,NotifyObservers 还会对每个有效载荷产生威胁。这个对吗?非常欢迎任何改进建议。
请注意,所有观察者都必须完成他们的工作,然后才能将有效负载形式的新工作传递给他们 - 'observation' 的顺序无关紧要。基本上,听众必须像主人一样行事,同时尽可能多地使用 TPL 来利用主机的核心。恕我直言,这需要观察者明确注册 listener/Observable.
PS:
我认为 Parallel.ForEach 不会为每个观察者创建一个线程:Why isn't Parallel.ForEach running multiple threads? 如果这是真的,我如何确保为每个观察者创建一个线程?
我想到的替代方案是:
public async void NotifyObservers(IPayload payload)
{
foreach (var observer in _observers)
{
var observer1 = observer;
await Task.Run(() => observer1.DoWork(payload));
}
await Task.WhenAll();
}
当然你可以这样做,但在 .net 中,如果你不想重新发明轮子,则不需要这样做:-)
在 c# 中,这可以使用事件来完成。
一个简短的例子:
//Your Listener who has a public eventhandler where others can add them as listeners
public class Listener{
//your eventhandler where others "add" them as listeners
public event EventHandler<PayLoadEventsArgs> IncomingPayload;
//a method where you process new data and want to notify the others
public void PushIncomingPayLoad(IPayload payload)
{
//check if there are any listeners
if(IncomingPayload != null)
//if so, then notify them with the data in the PayloadEventArgs
IncomingPayload(this, new PayloadEventArgs(payload));
}
}
//Your EventArgs class to hold the data
public class PayloadEventArgs : EventArgs{
Payload payload { get; private set; }
public PayloadEventArgs(Payload payload){
this.payload = payload;
}
}
public class Worker{
//add this instance as a observer
YourListenerInstance.IncomingPayload += DoWork;
//remove this instance
YourListenerInstance.IncomingPayload -= DoWork;
//This method gets called when the Listener notifies the IncomingPayload listeners
void DoWork(Object sender, PayloadEventArgs e){
Console.WriteLine(e.payload);
}
}
编辑:由于问题要求并行执行,在订阅者端执行新线程怎么样?我认为这是实现这一目标的最简单方法。
//Inside the DoWork method of the subscriber start a new thread
Task.Factory.StartNew( () =>
{
//Do your work here
});
//If you want to make sure that a new thread is used for the task, then add the TaskCreationOptions.LongRunning parameter
Task.Factory.StartNew( () =>
{
//Do your work here
}, TaskCreationOptions.LongRunning);
希望这能回答您的问题?如果没有,请发表评论。
我有一个侦听器,它以 IPayload 的形式接收工作。倾听者应该将这项工作推给真正做这项工作的观察者。这是我第一次粗略地尝试实现这一目标:
public interface IObserver
{
void DoWork(IPayload payload);
}
public interface IObservable
{
void RegisterObserver(IObserver observer);
void RemoveObserver(IObserver observer);
void NotifyObservers(IPayload payload);
}
public class Observer : IObserver
{
public void DoWork(IPayload payload)
{
// do some heavy lifting
}
}
public class Listener : IObservable
{
private readonly List<IObserver> _observers = new List<IObserver>();
public void PushIncomingPayLoad(IPayload payload)
{
NotifyObservers(payload);
}
public void RegisterObserver(IObserver observer)
{
_observers.Add(observer);
}
public void RemoveObserver(IObserver observer)
{
_observers.Remove(observer);
}
public void NotifyObservers(IPayload payload)
{
Parallel.ForEach(_observers, observer =>
{
observer.DoWork(payload);
});
}
}
这是遵循 observer/observable 模式(即 pub sub?)的有效方法吗?我的理解是,NotifyObservers 还会对每个有效载荷产生威胁。这个对吗?非常欢迎任何改进建议。
请注意,所有观察者都必须完成他们的工作,然后才能将有效负载形式的新工作传递给他们 - 'observation' 的顺序无关紧要。基本上,听众必须像主人一样行事,同时尽可能多地使用 TPL 来利用主机的核心。恕我直言,这需要观察者明确注册 listener/Observable.
PS:
我认为 Parallel.ForEach 不会为每个观察者创建一个线程:Why isn't Parallel.ForEach running multiple threads? 如果这是真的,我如何确保为每个观察者创建一个线程?
我想到的替代方案是:
public async void NotifyObservers(IPayload payload)
{
foreach (var observer in _observers)
{
var observer1 = observer;
await Task.Run(() => observer1.DoWork(payload));
}
await Task.WhenAll();
}
当然你可以这样做,但在 .net 中,如果你不想重新发明轮子,则不需要这样做:-) 在 c# 中,这可以使用事件来完成。
一个简短的例子:
//Your Listener who has a public eventhandler where others can add them as listeners
public class Listener{
//your eventhandler where others "add" them as listeners
public event EventHandler<PayLoadEventsArgs> IncomingPayload;
//a method where you process new data and want to notify the others
public void PushIncomingPayLoad(IPayload payload)
{
//check if there are any listeners
if(IncomingPayload != null)
//if so, then notify them with the data in the PayloadEventArgs
IncomingPayload(this, new PayloadEventArgs(payload));
}
}
//Your EventArgs class to hold the data
public class PayloadEventArgs : EventArgs{
Payload payload { get; private set; }
public PayloadEventArgs(Payload payload){
this.payload = payload;
}
}
public class Worker{
//add this instance as a observer
YourListenerInstance.IncomingPayload += DoWork;
//remove this instance
YourListenerInstance.IncomingPayload -= DoWork;
//This method gets called when the Listener notifies the IncomingPayload listeners
void DoWork(Object sender, PayloadEventArgs e){
Console.WriteLine(e.payload);
}
}
编辑:由于问题要求并行执行,在订阅者端执行新线程怎么样?我认为这是实现这一目标的最简单方法。
//Inside the DoWork method of the subscriber start a new thread
Task.Factory.StartNew( () =>
{
//Do your work here
});
//If you want to make sure that a new thread is used for the task, then add the TaskCreationOptions.LongRunning parameter
Task.Factory.StartNew( () =>
{
//Do your work here
}, TaskCreationOptions.LongRunning);
希望这能回答您的问题?如果没有,请发表评论。