Azure 云服务 - EventProcessor IEventProcessor.ProcessEventsAsync 未被命中
Azure Cloud Service - EventProcessor IEventProcessor.ProcessEventsAsync not getting hit
我遇到了辅助角色的问题,该角色似乎成功启动并注册了 EventProcessor class 实现(EventProcessorA 和 EventProcessorB),但它们都没有接收到任何事件。通过这个,我的意思是方法 IEventProcessor.ProcessEventsAsync 根本没有被击中。
每个 EventProcessor class 都有自己的事件中心。
我的日志显示为 EventProcessor classes 调用了构造函数和 OpenAsync 方法。事实上,它们恰好被调用了 4 次,如下所示。但在此之后,不会再发生 activity。我猜 4 次是因为有四个分区。
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - Open Async
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
也没有为 Worker 角色的 RunAsync 方法中的 EventProcessorOptions 提供偏移量,因此所有事件都应该涌入。
此外,在 Azure 门户中,当我启动事件时,我看到事件正在发生。
注册事件处理器的工作者角色代码:
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
private EventProcessorHost eventProcessorHostA;
private EventProcessorHost eventProcessorHostB;
public override void Run()
{
Trace.TraceInformation("ReportWorkerRole is running");
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Trace.TraceInformation("ReportWorkerRole has been started");
//EventHub Processing
try
{
string eventHubNameA = CloudConfigurationManager.GetSetting("EventHubNameA");
string eventHubNameB = CloudConfigurationManager.GetSetting("EventHubNameB");
string eventHubConnectionString = CloudConfigurationManager.GetSetting("EventHubConnectionString");
string storageAccountName = CloudConfigurationManager.GetSetting("AzureStorageAccount");
string storageAccountKey = CloudConfigurationManager.GetSetting("AzureStorageAccountKey");
string storageConnectionString = CloudConfigurationManager.GetSetting("AzureStorageAccountConnectionString");
string eventProcessorHostNameA = Guid.NewGuid().ToString();
eventProcessorHostA = new EventProcessorHost(eventProcessorHostNameA, eventHubNameA, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
string eventProcessorHostNameB = Guid.NewGuid().ToString();
eventProcessorHostB = new EventProcessorHost(eventProcessorHostNameB, eventHubNameB, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
}
catch (Exception ex)
{
//Logging omitted
}
return result;
}
public override void OnStop()
{
Trace.TraceInformation("ReportWorkerRole is stopping");
this.eventProcessorHostA.UnregisterEventProcessorAsync().Wait();
this.eventProcessorHostB.UnregisterEventProcessorAsync().Wait();
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("ReportWorkerRole has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
var options = new EventProcessorOptions()
{
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
//InitialOffsetProvider = (partitionId) => DateTime.Now
};
options.ExceptionReceived += (sender, e) =>
{
//Logging omitted
};
//Tried both using await and wait
eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options).Wait();
eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options).Wait();
//await eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options);
//await eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options);
// TODO: Replace the following with your own logic.
while (!cancellationToken.IsCancellationRequested)
{
Trace.TraceInformation("Working");
await Task.Delay(1000);
}
}
}
事件处理器 A(与 B 的配置相同):
class SimpleEventProcessorA : IEventProcessor
{
Stopwatch checkpointStopWatch;
//Non-relevant variables omitted
public SimpleEventProcessorA()
{
try
{
//Initializing variables using CloudConfigurationManager
//Logging omitted
}
catch (Exception ex)
{
//Logging omitted
}
}
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
//Logging omitted
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
try
{
//Logging omitted
Console.WriteLine("Initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
catch (Exception ex)
{
//Logging omitted
return Task.FromResult<object>(null);
}
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
//Logging omitted
foreach (EventData eventData in messages)
{
try
{
//Logging omitted
Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'",
context.Lease.PartitionId, data));
}
catch (Exception ex)
{
//Logging omitted
throw;
}
}
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
}
非常感谢任何帮助,谢谢!
更新
看起来一切正常...这是我在将内容推送到事件中心时使用的连接字符串。
这是我的事件中心连接字符串:
Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]
entityPath
设置不正确。它使用的是我设置的旧事件中心名称。它应该是为 eventHubNameA 或 eventHubNameB 设置的值。
回答问题,以便其他人可以从中受益。虽然答案在 "UPDATE" 部分的问题中有详细说明,但我在这里重申一下:
entityPath
设置不正确。它使用的是我设置的旧事件中心名称。它应该是为 eventHubNameA 或 eventHubNameB 设置的值。
而不是
Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]
应该是Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[eventHubNameA];SharedAccessKey=[mykey]
我遇到了辅助角色的问题,该角色似乎成功启动并注册了 EventProcessor class 实现(EventProcessorA 和 EventProcessorB),但它们都没有接收到任何事件。通过这个,我的意思是方法 IEventProcessor.ProcessEventsAsync 根本没有被击中。
每个 EventProcessor class 都有自己的事件中心。
我的日志显示为 EventProcessor classes 调用了构造函数和 OpenAsync 方法。事实上,它们恰好被调用了 4 次,如下所示。但在此之后,不会再发生 activity。我猜 4 次是因为有四个分区。
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - Open Async
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
也没有为 Worker 角色的 RunAsync 方法中的 EventProcessorOptions 提供偏移量,因此所有事件都应该涌入。
此外,在 Azure 门户中,当我启动事件时,我看到事件正在发生。
注册事件处理器的工作者角色代码:
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
private EventProcessorHost eventProcessorHostA;
private EventProcessorHost eventProcessorHostB;
public override void Run()
{
Trace.TraceInformation("ReportWorkerRole is running");
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Trace.TraceInformation("ReportWorkerRole has been started");
//EventHub Processing
try
{
string eventHubNameA = CloudConfigurationManager.GetSetting("EventHubNameA");
string eventHubNameB = CloudConfigurationManager.GetSetting("EventHubNameB");
string eventHubConnectionString = CloudConfigurationManager.GetSetting("EventHubConnectionString");
string storageAccountName = CloudConfigurationManager.GetSetting("AzureStorageAccount");
string storageAccountKey = CloudConfigurationManager.GetSetting("AzureStorageAccountKey");
string storageConnectionString = CloudConfigurationManager.GetSetting("AzureStorageAccountConnectionString");
string eventProcessorHostNameA = Guid.NewGuid().ToString();
eventProcessorHostA = new EventProcessorHost(eventProcessorHostNameA, eventHubNameA, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
string eventProcessorHostNameB = Guid.NewGuid().ToString();
eventProcessorHostB = new EventProcessorHost(eventProcessorHostNameB, eventHubNameB, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
}
catch (Exception ex)
{
//Logging omitted
}
return result;
}
public override void OnStop()
{
Trace.TraceInformation("ReportWorkerRole is stopping");
this.eventProcessorHostA.UnregisterEventProcessorAsync().Wait();
this.eventProcessorHostB.UnregisterEventProcessorAsync().Wait();
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("ReportWorkerRole has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
var options = new EventProcessorOptions()
{
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
//InitialOffsetProvider = (partitionId) => DateTime.Now
};
options.ExceptionReceived += (sender, e) =>
{
//Logging omitted
};
//Tried both using await and wait
eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options).Wait();
eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options).Wait();
//await eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options);
//await eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options);
// TODO: Replace the following with your own logic.
while (!cancellationToken.IsCancellationRequested)
{
Trace.TraceInformation("Working");
await Task.Delay(1000);
}
}
}
事件处理器 A(与 B 的配置相同):
class SimpleEventProcessorA : IEventProcessor
{
Stopwatch checkpointStopWatch;
//Non-relevant variables omitted
public SimpleEventProcessorA()
{
try
{
//Initializing variables using CloudConfigurationManager
//Logging omitted
}
catch (Exception ex)
{
//Logging omitted
}
}
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
//Logging omitted
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
try
{
//Logging omitted
Console.WriteLine("Initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
catch (Exception ex)
{
//Logging omitted
return Task.FromResult<object>(null);
}
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
//Logging omitted
foreach (EventData eventData in messages)
{
try
{
//Logging omitted
Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'",
context.Lease.PartitionId, data));
}
catch (Exception ex)
{
//Logging omitted
throw;
}
}
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
}
非常感谢任何帮助,谢谢!
更新
看起来一切正常...这是我在将内容推送到事件中心时使用的连接字符串。
这是我的事件中心连接字符串:
Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]
entityPath
设置不正确。它使用的是我设置的旧事件中心名称。它应该是为 eventHubNameA 或 eventHubNameB 设置的值。
回答问题,以便其他人可以从中受益。虽然答案在 "UPDATE" 部分的问题中有详细说明,但我在这里重申一下:
entityPath
设置不正确。它使用的是我设置的旧事件中心名称。它应该是为 eventHubNameA 或 eventHubNameB 设置的值。
而不是
Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]
应该是Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[eventHubNameA];SharedAccessKey=[mykey]