使用 IEventProcessor 注册观察者
registering observers with IEventProcessor
这是取自 here:
的 IEventProcessor 实现的一部分
public class SimpleEventProcessor : IEventProcessor
{
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
foreach (EventData eventData in events)
{
}
}
}
随着新事件添加到 EventHub,ProcessEventsAsync 方法被调用,foreach 循环可用于处理事件。我现在想将 Observer 添加到 SimpleEventProcessor,例如使用 here 中讨论的 ObserverRegistry。提议的 ObserverRegistry 如下所示:
public class ObserverRegistry : IObserverRegistry<IProjectionWriterFactory>
{
IEnumerable<object> GetObservers(IProjectionWriterFactory factory)
{
yield return new LoanApplicationObserver();
yield return new OfferObserver();
// more observers...
}
}
不幸的是,缺少一些东西。我如何向 SimpleEventProcessor 注册多个观察者,以便将事件从 ProcessEventsAsync 传递给所有观察者并最终传递给它们的 When 方法?
完整的源代码是 here。大纲如下:
您可以在 SimpleEventProcessor 上定义一个静态事件:
public class SimpleEventProcessor : IEventProcessor
{
public static event EventHandler<MessageReceivedEventArgs> OnMessageReceived;
public SimpleEventProcessor()
{ }
}
然后在 ProcessEventsAsync 中引发 OnMessageReceived 事件:
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData message in messages)
{
OnMessageReceived(this, new MessageReceivedEventArgs() { ReceivedOn = DateTimeOffset.UtcNow, Message = message });
}
}
非常重要:确保在处理器关闭时删除所有订阅者。这非常重要,因为缺少取消订阅的静态事件会导致内存泄漏 article explaining this.:
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
if (OnMessageReceived != null)
{
foreach (EventHandler<MessageReceivedEventArgs> subscriber in OnMessageReceived.GetInvocationList())
{
OnMessageReceived -= subscriber;
}
}
}
最后你可以连接观察者作为你初始化逻辑的一部分:
ObserverRegistry registry = new ObserverRegistry();
foreach (IObserver observer in registry.GetObservers())
{
SimpleEventProcessor.OnMessageReceived += new EventHandler<MessageReceivedEventArgs>(
(sender, e) => observer.When(e));
}
控制台应用程序的示例输出:
SimpleEventProcessor: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
Observer1: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
Observer2: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
SimpleEventProcessor: a29d5875-7c53-4a7c-8113-ef7c24c2851f
Observer1: a29d5875-7c53-4a7c-8113-ef7c24c2851f
Observer2: a29d5875-7c53-4a7c-8113-ef7c24c2851f
我想强调以下几点:
- 在您的用例中,注册
IEventProcessorFactory
可能更有效,因为您可以更好地控制处理器实例化和处置。
- 建议使 ProcessEventsAsync 方法尽可能轻便和快速。在您的情况下,创建单独的消费者组可能是更好的选择?
希望以上能回答您的问题。
这是取自 here:
的 IEventProcessor 实现的一部分public class SimpleEventProcessor : IEventProcessor
{
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
foreach (EventData eventData in events)
{
}
}
}
随着新事件添加到 EventHub,ProcessEventsAsync 方法被调用,foreach 循环可用于处理事件。我现在想将 Observer 添加到 SimpleEventProcessor,例如使用 here 中讨论的 ObserverRegistry。提议的 ObserverRegistry 如下所示:
public class ObserverRegistry : IObserverRegistry<IProjectionWriterFactory>
{
IEnumerable<object> GetObservers(IProjectionWriterFactory factory)
{
yield return new LoanApplicationObserver();
yield return new OfferObserver();
// more observers...
}
}
不幸的是,缺少一些东西。我如何向 SimpleEventProcessor 注册多个观察者,以便将事件从 ProcessEventsAsync 传递给所有观察者并最终传递给它们的 When 方法?
完整的源代码是 here。大纲如下:
您可以在 SimpleEventProcessor 上定义一个静态事件:
public class SimpleEventProcessor : IEventProcessor
{
public static event EventHandler<MessageReceivedEventArgs> OnMessageReceived;
public SimpleEventProcessor()
{ }
}
然后在 ProcessEventsAsync 中引发 OnMessageReceived 事件:
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData message in messages)
{
OnMessageReceived(this, new MessageReceivedEventArgs() { ReceivedOn = DateTimeOffset.UtcNow, Message = message });
}
}
非常重要:确保在处理器关闭时删除所有订阅者。这非常重要,因为缺少取消订阅的静态事件会导致内存泄漏 article explaining this.:
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
if (OnMessageReceived != null)
{
foreach (EventHandler<MessageReceivedEventArgs> subscriber in OnMessageReceived.GetInvocationList())
{
OnMessageReceived -= subscriber;
}
}
}
最后你可以连接观察者作为你初始化逻辑的一部分:
ObserverRegistry registry = new ObserverRegistry();
foreach (IObserver observer in registry.GetObservers())
{
SimpleEventProcessor.OnMessageReceived += new EventHandler<MessageReceivedEventArgs>(
(sender, e) => observer.When(e));
}
控制台应用程序的示例输出:
SimpleEventProcessor: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
Observer1: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
Observer2: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
SimpleEventProcessor: a29d5875-7c53-4a7c-8113-ef7c24c2851f
Observer1: a29d5875-7c53-4a7c-8113-ef7c24c2851f
Observer2: a29d5875-7c53-4a7c-8113-ef7c24c2851f
我想强调以下几点:
- 在您的用例中,注册
IEventProcessorFactory
可能更有效,因为您可以更好地控制处理器实例化和处置。 - 建议使 ProcessEventsAsync 方法尽可能轻便和快速。在您的情况下,创建单独的消费者组可能是更好的选择?
希望以上能回答您的问题。