Service Fabric 中的事件中心使用者

Event Hub Consumer in Service Fabric

我正在尝试让服务结构始终如一地从 Azure 事件中心提取消息。我似乎已将所有内容连接起来,但注意到我的消费者只是停止拉取事件。

我有一个集线器,其中包含我已推送的几千个事件。用 1 个分区配置集线器,并让我的服务结构服务也只有 1 个分区以方便调试。

服务启动,创建 EventHubClient,然后使用它创建 PartitionReceiver。接收器被传递到一个“EventLoop”,它在调用 receiver.ReceiveAsync 时进入一个“无限”。 EventLoop 的代码如下。

我观察到的是第一次通过循环我几乎总是收到一条消息。第二次我收到了 103 到 200 条消息。在那之后,我没有收到任何消息。似乎如果我重新启动服务,我会再次收到相同的消息 - 但那是因为当我重新启动服务时,它会从流的开头重新开始。

希望它保持 运行 直到我的 2000 条消息被消耗,然后它会等待我(偶尔轮询)。

我是否需要对 Azure.Messaging.EventHubs 5.3.0 包做一些特定的事情才能让它继续拉动事件?

//Here is how I am creating the EventHubClient:
var connectionString = "something secret";
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
   EntityPath = "NameOfMyEventHub"
};
try
{
   m_eventHubClient = EventHubClient.Create(connectionStringBuilder);
}

//Here is how I am getting the partition receiver
var receiver = m_eventHubClient.CreateReceiver("$Default", m_partitionId, EventPosition.FromStart());

//The event loop which the receiver is passed to
private async Task EventLoop(PartitionReceiver receiver)
  {
     m_started = true;
     while (m_keepRunning)
     {
        var events = await receiver.ReceiveAsync(m_options.BatchSize, TimeSpan.FromSeconds(5));
        if (events != null) //First 2/3 times events aren't null. After that, always null and I know there are more in the partition/
        {
           var eventsArray = events as EventData[] ?? events.ToArray();
           m_state.NumProcessedSinceLastSave += eventsArray.Count();

           foreach (var evt in eventsArray)
           {
              //Process the event
              await m_options.Processor.ProcessMessageAsync(evt, null);

              string lastOffset = evt.SystemProperties.Offset;

              if (m_state.NumProcessedSinceLastSave >= m_options.BatchSize)
              {
                 m_state.Offset = lastOffset;
                 m_state.NumProcessedSinceLastSave = 0;
                 await m_state.SaveAsync();
              }
           }
        }
     }

     m_started = false;
  }

**编辑,有人问了一个关于分区数量的问题。事件中心只有一个分区,SF 服务也只有一个分区。

打算使用服务结构状态来跟踪我在集线器中的偏移量,但现在这不是问题。

为每个分区创建分区侦听器。我得到这样的分区:

public async Task StartAsync()
  {
     // slice the pie according to distribution
     // this partition can get one or more assigned Event Hub Partition ids
     string[] eventHubPartitionIds = (await m_eventHubClient.GetRuntimeInformationAsync()).PartitionIds;
     string[] resolvedEventHubPartitionIds = m_options.ResolveAssignedEventHubPartitions(eventHubPartitionIds);

     foreach (var resolvedPartition in resolvedEventHubPartitionIds)
     {
        var partitionReceiver = new EventHubListenerPartitionReceiver(m_eventHubClient, resolvedPartition, m_options);
        await partitionReceiver.StartAsync();
        m_partitionReceivers.Add(partitionReceiver);
     }
  }

调用partitionListener.StartAsync的时候,其实是创建了PartitionListener,像这样(其实比这个多了点,但是走的分支是这个:

m_eventHubClient.CreateReceiver(m_options.EventHubConsumerGroupName, m_partitionId, EventPosition.FromStart());

感谢您提供任何提示。 会

你有多少分区?我在您的代码中看不到您如何确保读取默认使用者组中的所有分区。

您使用 PartitionReceiver 而不是 EventProcessorHost 的具体原因是什么?

对我来说,SF 似乎非常适合使用事件处理器主机。我看到已经有一个 SF integrated solution 使用有状态服务进行检查点。