简单的内存消息队列
Simple in-memory message queue
我们现有的域事件实现限制(通过阻塞)一次发布到一个线程,以避免对处理程序的重入调用:
public interface IDomainEvent {} // Marker interface
public class Dispatcher : IDisposable
{
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
// Subscribe code...
public void Publish(IDomainEvent domainEvent)
{
semaphore.Wait();
try
{
// Get event subscriber(s) from concurrent dictionary...
foreach (Action<IDomainEvent> subscriber in eventSubscribers)
{
subscriber(domainEvent);
}
}
finally
{
semaphore.Release();
}
}
// Dispose pattern...
}
如果处理程序发布事件,这将死锁。
如何重写它以序列化对 Publish
的调用?换句话说,如果订阅处理程序 A 发布事件 B,我将得到:
- 处理程序 A 已调用
- 处理程序 B 已调用
同时在多线程环境中保留对处理程序的不可重入调用的条件。
我不想更改 public 方法签名;例如,应用程序中没有地方可以调用方法来发布队列。
用那个界面做不到。您可以异步处理事件订阅以消除死锁,同时仍然 运行 它们是串行的,但是您不能保证您描述的顺序。在事件 A 的处理程序 运行 但在它发布事件 B 之前,对 Publish 的另一个调用可能会使某些东西(事件 C)入队。然后事件 B 在队列中排在事件 C 之后。
只要处理程序 A 在获取队列中的项目时与其他客户端处于平等地位,它要么必须像其他人一样等待(死锁),要么必须公平地进行(先到先得)服务)。您那里的界面不允许对两者进行区别对待。
这并不是说您不能在您的逻辑中使用一些恶作剧来尝试区分它们(例如,基于线程 ID 或其他可识别的东西),但如果您不这样做,那么沿着这些思路的任何东西都是不可靠的也控制订阅者代码。
您必须使发布异步才能实现。天真的实现会像这样简单:
public class Dispatcher : IDisposable {
private readonly BlockingCollection<IDomainEvent> _queue = new BlockingCollection<IDomainEvent>(new ConcurrentQueue<IDomainEvent>());
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public Dispatcher() {
new Thread(Consume) {
IsBackground = true
}.Start();
}
private List<Action<IDomainEvent>> _subscribers = new List<Action<IDomainEvent>>();
public void AddSubscriber(Action<IDomainEvent> sub) {
_subscribers.Add(sub);
}
private void Consume() {
try {
foreach (var @event in _queue.GetConsumingEnumerable(_cts.Token)) {
try {
foreach (Action<IDomainEvent> subscriber in _subscribers) {
subscriber(@event);
}
}
catch (Exception ex) {
// log, handle
}
}
}
catch (OperationCanceledException) {
// expected
}
}
public void Publish(IDomainEvent domainEvent) {
_queue.Add(domainEvent);
}
public void Dispose() {
_cts.Cancel();
}
}
我们想出了一个同步的方法。
public class Dispatcher : IDisposable
{
private readonly ConcurrentQueue<IDomainEvent> queue = new ConcurrentQueue<IDomainEvent>();
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
// Subscribe code...
public void Publish(IDomainEvent domainEvent)
{
queue.Enqueue(domainEvent);
if (IsPublishing)
{
return;
}
PublishQueue();
}
private void PublishQueue()
{
IDomainEvent domainEvent;
while (queue.TryDequeue(out domainEvent))
{
InternalPublish(domainEvent);
}
}
private void InternalPublish(IDomainEvent domainEvent)
{
semaphore.Wait();
try
{
// Get event subscriber(s) from concurrent dictionary...
foreach (Action<IDomainEvent> subscriber in eventSubscribers)
{
subscriber(domainEvent);
}
}
finally
{
semaphore.Release();
}
// Necessary, as calls to Publish during publishing could have queued events and returned.
PublishQueue();
}
private bool IsPublishing
{
get { return semaphore.CurrentCount < 1; }
}
// Dispose pattern for semaphore...
}
}
我们现有的域事件实现限制(通过阻塞)一次发布到一个线程,以避免对处理程序的重入调用:
public interface IDomainEvent {} // Marker interface
public class Dispatcher : IDisposable
{
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
// Subscribe code...
public void Publish(IDomainEvent domainEvent)
{
semaphore.Wait();
try
{
// Get event subscriber(s) from concurrent dictionary...
foreach (Action<IDomainEvent> subscriber in eventSubscribers)
{
subscriber(domainEvent);
}
}
finally
{
semaphore.Release();
}
}
// Dispose pattern...
}
如果处理程序发布事件,这将死锁。
如何重写它以序列化对 Publish
的调用?换句话说,如果订阅处理程序 A 发布事件 B,我将得到:
- 处理程序 A 已调用
- 处理程序 B 已调用
同时在多线程环境中保留对处理程序的不可重入调用的条件。
我不想更改 public 方法签名;例如,应用程序中没有地方可以调用方法来发布队列。
用那个界面做不到。您可以异步处理事件订阅以消除死锁,同时仍然 运行 它们是串行的,但是您不能保证您描述的顺序。在事件 A 的处理程序 运行 但在它发布事件 B 之前,对 Publish 的另一个调用可能会使某些东西(事件 C)入队。然后事件 B 在队列中排在事件 C 之后。
只要处理程序 A 在获取队列中的项目时与其他客户端处于平等地位,它要么必须像其他人一样等待(死锁),要么必须公平地进行(先到先得)服务)。您那里的界面不允许对两者进行区别对待。
这并不是说您不能在您的逻辑中使用一些恶作剧来尝试区分它们(例如,基于线程 ID 或其他可识别的东西),但如果您不这样做,那么沿着这些思路的任何东西都是不可靠的也控制订阅者代码。
您必须使发布异步才能实现。天真的实现会像这样简单:
public class Dispatcher : IDisposable {
private readonly BlockingCollection<IDomainEvent> _queue = new BlockingCollection<IDomainEvent>(new ConcurrentQueue<IDomainEvent>());
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public Dispatcher() {
new Thread(Consume) {
IsBackground = true
}.Start();
}
private List<Action<IDomainEvent>> _subscribers = new List<Action<IDomainEvent>>();
public void AddSubscriber(Action<IDomainEvent> sub) {
_subscribers.Add(sub);
}
private void Consume() {
try {
foreach (var @event in _queue.GetConsumingEnumerable(_cts.Token)) {
try {
foreach (Action<IDomainEvent> subscriber in _subscribers) {
subscriber(@event);
}
}
catch (Exception ex) {
// log, handle
}
}
}
catch (OperationCanceledException) {
// expected
}
}
public void Publish(IDomainEvent domainEvent) {
_queue.Add(domainEvent);
}
public void Dispose() {
_cts.Cancel();
}
}
我们想出了一个同步的方法。
public class Dispatcher : IDisposable
{
private readonly ConcurrentQueue<IDomainEvent> queue = new ConcurrentQueue<IDomainEvent>();
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
// Subscribe code...
public void Publish(IDomainEvent domainEvent)
{
queue.Enqueue(domainEvent);
if (IsPublishing)
{
return;
}
PublishQueue();
}
private void PublishQueue()
{
IDomainEvent domainEvent;
while (queue.TryDequeue(out domainEvent))
{
InternalPublish(domainEvent);
}
}
private void InternalPublish(IDomainEvent domainEvent)
{
semaphore.Wait();
try
{
// Get event subscriber(s) from concurrent dictionary...
foreach (Action<IDomainEvent> subscriber in eventSubscribers)
{
subscriber(domainEvent);
}
}
finally
{
semaphore.Release();
}
// Necessary, as calls to Publish during publishing could have queued events and returned.
PublishQueue();
}
private bool IsPublishing
{
get { return semaphore.CurrentCount < 1; }
}
// Dispose pattern for semaphore...
}
}