使用 Reactive X 流处理来自事件中心的数据

Processing data from Event Hubs with Reactive X streams

我想使用 Rx .net 扩展来处理来自 Azure 事件中心的事件。

如何根据从 EventProcessorHost 获得的消息创建可观察流?

我没有找到这个场景的参考,我是否遗漏了一些基本的东西?我是在尝试做一些没有意义的事情吗?

是否有意义取决于你。你为什么 need/want 使用 Reactive Extensions?大多数场景涉及使用 Azure Stream Analytics 近乎实时地转换和查询数据。

但是使用 EventProcessor 来处理数据是可以做到的,一个非常粗略的草图可以帮助您入门:

public class EventProcessor : IEventProcessor
{
    private readonly EventStreamProcessor eventStreamProcessor;

    public EventProcessor(EventStreamProcessor eventStreamProcessor)
    {
        this.eventStreamProcessor = eventStreamProcessor;
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.CompletedTask;
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> eventDatas)
    {
        foreach(var eventData in eventDatas)
            eventStreamProcessor.Post(eventData);

        return Task.CompletedTask;
    }

    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        return Task.CompletedTask;
    }
}

public sealed class EventStreamProcessor : IDisposable
{
    private Subject<EventData> dataStream = new Subject<EventData>();
    private readonly IDisposable subscription;

    public EventStreamProcessor()
    {
        subscription = dataStream
            .Synchronize()
            .AsObservable()
            .Subscribe((evenData) => {
                // Do something
            })
    }

    public void Dispose()
    {
        dataStream.OnCompleted();
        subscription.Dispose();
    }

    public void Post(EventData eventData)
    {
        dataStream.OnNext(eventData);
    }
}

需要考虑的几件事:

  • 可靠的检查点将变得困难。您在将数据推送到 RX 流后的任何时刻调用 await context.CheckpointAsync();,但您不知道 RX 管道已经处理了数据。
  • 多个 EventProcessor 可以在任何给定时刻处于活动状态,因此请确保将数据推送到单个 RX 流。使用单例或使用 IEventProcessorFactory 接口
  • 的实现将相同的 RX 处理器注入每个 EventProcessor

我认为这个 的答案也与您相关。

有事件中心的实现作为 Akka.NET 反应流 Azure EventHub adapter 的来源。