Service Fabric 中的事件中心使用者
Event Hub Consumer in Service Fabric
我正在尝试让服务结构始终如一地从 Azure 事件中心提取消息。我似乎已将所有内容连接起来,但注意到我的消费者只是停止拉取事件。
我有一个集线器,其中包含我已推送的几千个事件。用 1 个分区配置集线器,并让我的服务结构服务也只有 1 个分区以方便调试。
服务启动,创建 EventHubClient,然后使用它创建 PartitionReceiver。接收器被传递到一个“EventLoop”,它在调用 receiver.ReceiveAsync 时进入一个“无限”。 EventLoop 的代码如下。
我观察到的是第一次通过循环我几乎总是收到一条消息。第二次我收到了 103 到 200 条消息。在那之后,我没有收到任何消息。似乎如果我重新启动服务,我会再次收到相同的消息 - 但那是因为当我重新启动服务时,它会从流的开头重新开始。
希望它保持 运行 直到我的 2000 条消息被消耗,然后它会等待我(偶尔轮询)。
我是否需要对 Azure.Messaging.EventHubs 5.3.0 包做一些特定的事情才能让它继续拉动事件?
//Here is how I am creating the EventHubClient:
var connectionString = "something secret";
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
EntityPath = "NameOfMyEventHub"
};
try
{
m_eventHubClient = EventHubClient.Create(connectionStringBuilder);
}
//Here is how I am getting the partition receiver
var receiver = m_eventHubClient.CreateReceiver("$Default", m_partitionId, EventPosition.FromStart());
//The event loop which the receiver is passed to
private async Task EventLoop(PartitionReceiver receiver)
{
m_started = true;
while (m_keepRunning)
{
var events = await receiver.ReceiveAsync(m_options.BatchSize, TimeSpan.FromSeconds(5));
if (events != null) //First 2/3 times events aren't null. After that, always null and I know there are more in the partition/
{
var eventsArray = events as EventData[] ?? events.ToArray();
m_state.NumProcessedSinceLastSave += eventsArray.Count();
foreach (var evt in eventsArray)
{
//Process the event
await m_options.Processor.ProcessMessageAsync(evt, null);
string lastOffset = evt.SystemProperties.Offset;
if (m_state.NumProcessedSinceLastSave >= m_options.BatchSize)
{
m_state.Offset = lastOffset;
m_state.NumProcessedSinceLastSave = 0;
await m_state.SaveAsync();
}
}
}
}
m_started = false;
}
**编辑,有人问了一个关于分区数量的问题。事件中心只有一个分区,SF 服务也只有一个分区。
打算使用服务结构状态来跟踪我在集线器中的偏移量,但现在这不是问题。
为每个分区创建分区侦听器。我得到这样的分区:
public async Task StartAsync()
{
// slice the pie according to distribution
// this partition can get one or more assigned Event Hub Partition ids
string[] eventHubPartitionIds = (await m_eventHubClient.GetRuntimeInformationAsync()).PartitionIds;
string[] resolvedEventHubPartitionIds = m_options.ResolveAssignedEventHubPartitions(eventHubPartitionIds);
foreach (var resolvedPartition in resolvedEventHubPartitionIds)
{
var partitionReceiver = new EventHubListenerPartitionReceiver(m_eventHubClient, resolvedPartition, m_options);
await partitionReceiver.StartAsync();
m_partitionReceivers.Add(partitionReceiver);
}
}
调用partitionListener.StartAsync的时候,其实是创建了PartitionListener,像这样(其实比这个多了点,但是走的分支是这个:
m_eventHubClient.CreateReceiver(m_options.EventHubConsumerGroupName, m_partitionId, EventPosition.FromStart());
感谢您提供任何提示。
会
你有多少分区?我在您的代码中看不到您如何确保读取默认使用者组中的所有分区。
您使用 PartitionReceiver
而不是 EventProcessorHost 的具体原因是什么?
对我来说,SF 似乎非常适合使用事件处理器主机。我看到已经有一个 SF integrated solution 使用有状态服务进行检查点。
我正在尝试让服务结构始终如一地从 Azure 事件中心提取消息。我似乎已将所有内容连接起来,但注意到我的消费者只是停止拉取事件。
我有一个集线器,其中包含我已推送的几千个事件。用 1 个分区配置集线器,并让我的服务结构服务也只有 1 个分区以方便调试。
服务启动,创建 EventHubClient,然后使用它创建 PartitionReceiver。接收器被传递到一个“EventLoop”,它在调用 receiver.ReceiveAsync 时进入一个“无限”。 EventLoop 的代码如下。
我观察到的是第一次通过循环我几乎总是收到一条消息。第二次我收到了 103 到 200 条消息。在那之后,我没有收到任何消息。似乎如果我重新启动服务,我会再次收到相同的消息 - 但那是因为当我重新启动服务时,它会从流的开头重新开始。
希望它保持 运行 直到我的 2000 条消息被消耗,然后它会等待我(偶尔轮询)。
我是否需要对 Azure.Messaging.EventHubs 5.3.0 包做一些特定的事情才能让它继续拉动事件?
//Here is how I am creating the EventHubClient:
var connectionString = "something secret";
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
EntityPath = "NameOfMyEventHub"
};
try
{
m_eventHubClient = EventHubClient.Create(connectionStringBuilder);
}
//Here is how I am getting the partition receiver
var receiver = m_eventHubClient.CreateReceiver("$Default", m_partitionId, EventPosition.FromStart());
//The event loop which the receiver is passed to
private async Task EventLoop(PartitionReceiver receiver)
{
m_started = true;
while (m_keepRunning)
{
var events = await receiver.ReceiveAsync(m_options.BatchSize, TimeSpan.FromSeconds(5));
if (events != null) //First 2/3 times events aren't null. After that, always null and I know there are more in the partition/
{
var eventsArray = events as EventData[] ?? events.ToArray();
m_state.NumProcessedSinceLastSave += eventsArray.Count();
foreach (var evt in eventsArray)
{
//Process the event
await m_options.Processor.ProcessMessageAsync(evt, null);
string lastOffset = evt.SystemProperties.Offset;
if (m_state.NumProcessedSinceLastSave >= m_options.BatchSize)
{
m_state.Offset = lastOffset;
m_state.NumProcessedSinceLastSave = 0;
await m_state.SaveAsync();
}
}
}
}
m_started = false;
}
**编辑,有人问了一个关于分区数量的问题。事件中心只有一个分区,SF 服务也只有一个分区。
打算使用服务结构状态来跟踪我在集线器中的偏移量,但现在这不是问题。
为每个分区创建分区侦听器。我得到这样的分区:
public async Task StartAsync()
{
// slice the pie according to distribution
// this partition can get one or more assigned Event Hub Partition ids
string[] eventHubPartitionIds = (await m_eventHubClient.GetRuntimeInformationAsync()).PartitionIds;
string[] resolvedEventHubPartitionIds = m_options.ResolveAssignedEventHubPartitions(eventHubPartitionIds);
foreach (var resolvedPartition in resolvedEventHubPartitionIds)
{
var partitionReceiver = new EventHubListenerPartitionReceiver(m_eventHubClient, resolvedPartition, m_options);
await partitionReceiver.StartAsync();
m_partitionReceivers.Add(partitionReceiver);
}
}
调用partitionListener.StartAsync的时候,其实是创建了PartitionListener,像这样(其实比这个多了点,但是走的分支是这个:
m_eventHubClient.CreateReceiver(m_options.EventHubConsumerGroupName, m_partitionId, EventPosition.FromStart());
感谢您提供任何提示。 会
你有多少分区?我在您的代码中看不到您如何确保读取默认使用者组中的所有分区。
您使用 PartitionReceiver
而不是 EventProcessorHost 的具体原因是什么?
对我来说,SF 似乎非常适合使用事件处理器主机。我看到已经有一个 SF integrated solution 使用有状态服务进行检查点。