无法在 Web 应用程序中执行 RecieveEvent() 和 RegisterEventProcessorAsync 无法使用 azure 事件中心
unable to execute RecieveEvent() and RegisterEventProcessorAsync inside web application failed to use azure event hub
控制台应用程序与 Azure Event Hub
完美配合,但当我尝试将
RecieveEvent()
Index.aspx.cs
中的代码收到一个调用,但此后它无法执行
eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait(); and `SimpleEventProcessor.cs` code why so ?
如何在网络应用中实现?
Index.aspx.cs
[WebMethod]
public static void RecieveEvent()
{
string eventHubConnectionString = "Endpoint=sb://rpidemoeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=wyi1HpKwEIOpiSPnQaCloPr9ELhESOSD/F2SJiY0RFU=";
string eventHubName = "myeventhubname";
string storageAccountName = "mystoragename";
string storageAccountKey = "mykey";
string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey);
string eventProcessorHostName = Guid.NewGuid().ToString();
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
//Console.WriteLine("Registering EventProcessor...");
var options = new EventProcessorOptions();
options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();
//Console.WriteLine("Receiving. Press enter key to stop worker.");
//Console.ReadLine();
eventProcessorHost.UnregisterEventProcessorAsync().Wait();
}
SimpleEventProcessor
public class SimpleEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
// Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
// Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(eventData.GetBytes());
//TODO: sed mail notification
if (Convert.ToInt32(data) > 25 && Index.isMail == false)
{
SendMail();
Index.isMail = true;
}
//Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'",
// context.Lease.PartitionId, data));
}
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
无法执行上面的调用,调用不来这里为什么?
不使用任何工作者角色或 webjob 可以工作吗?
除了 p e p 的第一条解释为什么它不起作用的评论外,请考虑以下内容:
您应该看到 EventProcessor
的目的是在连续过程/循环中将传入的事件流读取到 EventHub
(ProcessEventsAsync
方法在 EventProcessor
instance is up and 运行ning. 这就是为什么 Web 作业/工作者角色比在 Web 应用程序页面中更适合此操作,因为这意味着在合理的时间内处理请求.
如果您确实希望它在网络应用程序页面中 运行,请不要使用 EventProcessor
,而是创建一个直接接收器(https://azure.microsoft.com/en-us/documentation/articles/event-hubs-programming-guide/#event-consumers) and specify an amount of messsage you want to receive using the Receive(In32)
method: https://msdn.microsoft.com/en-us/library/azure/dn790451.aspx。这样您就可以处理和显示有限数量的消息。
我不知道您的具体用例,但我们将 EventProcessor
托管在一个持续 运行 的 Web 作业中,并且当 Web 作业由于停止命令或我们的其他原因而关闭时可以通过调用 UnregisterEventProcessorAsync()
.
优雅地停止 EventProcessor
控制台应用程序与 Azure Event Hub
完美配合,但当我尝试将
RecieveEvent()
Index.aspx.cs
中的代码收到一个调用,但此后它无法执行
eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait(); and `SimpleEventProcessor.cs` code why so ?
如何在网络应用中实现?
Index.aspx.cs
[WebMethod]
public static void RecieveEvent()
{
string eventHubConnectionString = "Endpoint=sb://rpidemoeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=wyi1HpKwEIOpiSPnQaCloPr9ELhESOSD/F2SJiY0RFU=";
string eventHubName = "myeventhubname";
string storageAccountName = "mystoragename";
string storageAccountKey = "mykey";
string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey);
string eventProcessorHostName = Guid.NewGuid().ToString();
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
//Console.WriteLine("Registering EventProcessor...");
var options = new EventProcessorOptions();
options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();
//Console.WriteLine("Receiving. Press enter key to stop worker.");
//Console.ReadLine();
eventProcessorHost.UnregisterEventProcessorAsync().Wait();
}
SimpleEventProcessor
public class SimpleEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
// Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
// Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(eventData.GetBytes());
//TODO: sed mail notification
if (Convert.ToInt32(data) > 25 && Index.isMail == false)
{
SendMail();
Index.isMail = true;
}
//Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'",
// context.Lease.PartitionId, data));
}
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
无法执行上面的调用,调用不来这里为什么? 不使用任何工作者角色或 webjob 可以工作吗?
除了 p e p 的第一条解释为什么它不起作用的评论外,请考虑以下内容:
您应该看到 EventProcessor
的目的是在连续过程/循环中将传入的事件流读取到 EventHub
(ProcessEventsAsync
方法在 EventProcessor
instance is up and 运行ning. 这就是为什么 Web 作业/工作者角色比在 Web 应用程序页面中更适合此操作,因为这意味着在合理的时间内处理请求.
如果您确实希望它在网络应用程序页面中 运行,请不要使用 EventProcessor
,而是创建一个直接接收器(https://azure.microsoft.com/en-us/documentation/articles/event-hubs-programming-guide/#event-consumers) and specify an amount of messsage you want to receive using the Receive(In32)
method: https://msdn.microsoft.com/en-us/library/azure/dn790451.aspx。这样您就可以处理和显示有限数量的消息。
我不知道您的具体用例,但我们将 EventProcessor
托管在一个持续 运行 的 Web 作业中,并且当 Web 作业由于停止命令或我们的其他原因而关闭时可以通过调用 UnregisterEventProcessorAsync()
.
EventProcessor