使用 Reactive X 流处理来自事件中心的数据
Processing data from Event Hubs with Reactive X streams
我想使用 Rx .net 扩展来处理来自 Azure 事件中心的事件。
如何根据从 EventProcessorHost 获得的消息创建可观察流?
我没有找到这个场景的参考,我是否遗漏了一些基本的东西?我是在尝试做一些没有意义的事情吗?
是否有意义取决于你。你为什么 need/want 使用 Reactive Extensions?大多数场景涉及使用 Azure Stream Analytics 近乎实时地转换和查询数据。
但是使用 EventProcessor 来处理数据是可以做到的,一个非常粗略的草图可以帮助您入门:
public class EventProcessor : IEventProcessor
{
private readonly EventStreamProcessor eventStreamProcessor;
public EventProcessor(EventStreamProcessor eventStreamProcessor)
{
this.eventStreamProcessor = eventStreamProcessor;
}
public Task OpenAsync(PartitionContext context)
{
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> eventDatas)
{
foreach(var eventData in eventDatas)
eventStreamProcessor.Post(eventData);
return Task.CompletedTask;
}
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
return Task.CompletedTask;
}
}
public sealed class EventStreamProcessor : IDisposable
{
private Subject<EventData> dataStream = new Subject<EventData>();
private readonly IDisposable subscription;
public EventStreamProcessor()
{
subscription = dataStream
.Synchronize()
.AsObservable()
.Subscribe((evenData) => {
// Do something
})
}
public void Dispose()
{
dataStream.OnCompleted();
subscription.Dispose();
}
public void Post(EventData eventData)
{
dataStream.OnNext(eventData);
}
}
需要考虑的几件事:
- 可靠的检查点将变得困难。您在将数据推送到 RX 流后的任何时刻调用
await context.CheckpointAsync();
,但您不知道 RX 管道已经处理了数据。
- 多个 EventProcessor 可以在任何给定时刻处于活动状态,因此请确保将数据推送到单个 RX 流。使用单例或使用
IEventProcessorFactory
接口 的实现将相同的 RX 处理器注入每个 EventProcessor
我认为这个 的答案也与您相关。
有事件中心的实现作为 Akka.NET 反应流 Azure EventHub adapter 的来源。
我想使用 Rx .net 扩展来处理来自 Azure 事件中心的事件。
如何根据从 EventProcessorHost 获得的消息创建可观察流?
我没有找到这个场景的参考,我是否遗漏了一些基本的东西?我是在尝试做一些没有意义的事情吗?
是否有意义取决于你。你为什么 need/want 使用 Reactive Extensions?大多数场景涉及使用 Azure Stream Analytics 近乎实时地转换和查询数据。
但是使用 EventProcessor 来处理数据是可以做到的,一个非常粗略的草图可以帮助您入门:
public class EventProcessor : IEventProcessor
{
private readonly EventStreamProcessor eventStreamProcessor;
public EventProcessor(EventStreamProcessor eventStreamProcessor)
{
this.eventStreamProcessor = eventStreamProcessor;
}
public Task OpenAsync(PartitionContext context)
{
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> eventDatas)
{
foreach(var eventData in eventDatas)
eventStreamProcessor.Post(eventData);
return Task.CompletedTask;
}
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
return Task.CompletedTask;
}
}
public sealed class EventStreamProcessor : IDisposable
{
private Subject<EventData> dataStream = new Subject<EventData>();
private readonly IDisposable subscription;
public EventStreamProcessor()
{
subscription = dataStream
.Synchronize()
.AsObservable()
.Subscribe((evenData) => {
// Do something
})
}
public void Dispose()
{
dataStream.OnCompleted();
subscription.Dispose();
}
public void Post(EventData eventData)
{
dataStream.OnNext(eventData);
}
}
需要考虑的几件事:
- 可靠的检查点将变得困难。您在将数据推送到 RX 流后的任何时刻调用
await context.CheckpointAsync();
,但您不知道 RX 管道已经处理了数据。 - 多个 EventProcessor 可以在任何给定时刻处于活动状态,因此请确保将数据推送到单个 RX 流。使用单例或使用
IEventProcessorFactory
接口 的实现将相同的 RX 处理器注入每个 EventProcessor
我认为这个
有事件中心的实现作为 Akka.NET 反应流 Azure EventHub adapter 的来源。