任何使用 EventHub 的 WebJob 示例?
Any Example of WebJob using EventHub?
我试图从 WebJobsSDK 中的示例中得出一些东西 gitHub
var eventHubConfig = new EventHubConfiguration();
string eventHubName = "MyHubName";
eventHubConfig.AddSender(eventHubName,"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=SendRule;SharedAccessKey=xxxxxxxx");
eventHubConfig.AddReceiver(eventHubName, "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=yyyyyyy");
config.UseEventHub(eventHubConfig);
JobHost host = new JobHost(config);
但对于我这样有限的 "skillset" 的人来说,这恐怕还不够!
我找不到具有 UseEventHub 属性 的 JobHostConfiguration 实例(使用 Microsoft.AzureWebJobs 包的 v1.2.0-alpha-10291 版本),所以我可以't 将 EventHubConfiguration 传递给 JobHost。
我以前使用过 EventHub,但不在 WebJob 上下文中。我看不出如果使用 WebJob 触发是否仍然需要 EventHostProcessor...或者 WebJob 触发器是否本质上充当 EventHostProcessor?
无论如何,如果有人能为像我这样的傻瓜提供更完整的示例,那就太好了!谢谢
EventProcessorHost 仍然是必需的,因为 WebJob 只是为 运行 它提供托管环境。据我所知,EventProcessorHost并没有深入到WebJob中,所以它的触发机制不能用于处理EventHub消息。我连续将 WebJob 用于 运行 EventProcessorHost:
public static void Main()
{
RunAsync().Wait();
}
private static async Task RunAsync()
{
try
{
using (var shutdownWatcher = new WebJobsShutdownWatcher())
{
await Console.Out.WriteLineAsync("Initializing...");
var eventProcessorHostName = "eventProcessorHostName";
var eventHubName = ConfigurationManager.AppSettings["eventHubName"];
var consumerGroupName = ConfigurationManager.AppSettings["eventHubConsumerGroupName"];
var eventHubConnectionString = ConfigurationManager.ConnectionStrings["EventHub"].ConnectionString;
var storageConnectionString = ConfigurationManager.ConnectionStrings["EventHubStorage"].ConnectionString;
var eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, consumerGroupName, eventHubConnectionString, storageConnectionString);
await Console.Out.WriteLineAsync("Registering event processors...");
var processorOptions = new EventProcessorOptions();
processorOptions.ExceptionReceived += ProcessorOptions_ExceptionReceived;
await eventProcessorHost.RegisterEventProcessorAsync<CustomEventProcessor>(processorOptions);
await Console.Out.WriteLineAsync("Processing...");
await Task.Delay(Timeout.Infinite, shutdownWatcher.Token);
await Console.Out.WriteLineAsync("Unregistering event processors...");
await eventProcessorHost.UnregisterEventProcessorAsync();
await Console.Out.WriteLineAsync("Finished.");
}
catch (Exception ex)
{
await HandleErrorAsync(ex);
}
}
}
private static async void ProcessorOptions_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
await HandleErrorAsync(e.Exception);
}
private static async Task HandleErrorAsync(Exception ex)
{
await Console.Error.WriteLineAsync($"Critical error occured: {ex.Message}{ex.StackTrace}");
}
根据文档 here,您应该拥有所需的所有信息。
您缺少的是 Microsoft.Azure.WebJobs.ServiceBus.1.2.0-alpha-10291
nuget 包的引用。
UseEventHub
是在此包中声明的扩展方法。
否则你的配置似乎没问题。
以下是有关如何接收或发送消息的示例 from/to EventHub:
public class BasicTest
{
public class Payload
{
public int Counter { get; set; }
}
public static void SendEvents([EventHub("MyHubName")] out Payload x)
{
x = new Payload { Counter = 100 };
}
public static void Trigger(
[EventHubTrigger("MyHubName")] Payload x,
[EventHub("MyHubName")] out Payload y)
{
x.Counter++;
y = x;
}
}
我试图从 WebJobsSDK 中的示例中得出一些东西 gitHub
var eventHubConfig = new EventHubConfiguration();
string eventHubName = "MyHubName";
eventHubConfig.AddSender(eventHubName,"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=SendRule;SharedAccessKey=xxxxxxxx");
eventHubConfig.AddReceiver(eventHubName, "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=yyyyyyy");
config.UseEventHub(eventHubConfig);
JobHost host = new JobHost(config);
但对于我这样有限的 "skillset" 的人来说,这恐怕还不够!
我找不到具有 UseEventHub 属性 的 JobHostConfiguration 实例(使用 Microsoft.AzureWebJobs 包的 v1.2.0-alpha-10291 版本),所以我可以't 将 EventHubConfiguration 传递给 JobHost。
我以前使用过 EventHub,但不在 WebJob 上下文中。我看不出如果使用 WebJob 触发是否仍然需要 EventHostProcessor...或者 WebJob 触发器是否本质上充当 EventHostProcessor?
无论如何,如果有人能为像我这样的傻瓜提供更完整的示例,那就太好了!谢谢
EventProcessorHost 仍然是必需的,因为 WebJob 只是为 运行 它提供托管环境。据我所知,EventProcessorHost并没有深入到WebJob中,所以它的触发机制不能用于处理EventHub消息。我连续将 WebJob 用于 运行 EventProcessorHost:
public static void Main()
{
RunAsync().Wait();
}
private static async Task RunAsync()
{
try
{
using (var shutdownWatcher = new WebJobsShutdownWatcher())
{
await Console.Out.WriteLineAsync("Initializing...");
var eventProcessorHostName = "eventProcessorHostName";
var eventHubName = ConfigurationManager.AppSettings["eventHubName"];
var consumerGroupName = ConfigurationManager.AppSettings["eventHubConsumerGroupName"];
var eventHubConnectionString = ConfigurationManager.ConnectionStrings["EventHub"].ConnectionString;
var storageConnectionString = ConfigurationManager.ConnectionStrings["EventHubStorage"].ConnectionString;
var eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, consumerGroupName, eventHubConnectionString, storageConnectionString);
await Console.Out.WriteLineAsync("Registering event processors...");
var processorOptions = new EventProcessorOptions();
processorOptions.ExceptionReceived += ProcessorOptions_ExceptionReceived;
await eventProcessorHost.RegisterEventProcessorAsync<CustomEventProcessor>(processorOptions);
await Console.Out.WriteLineAsync("Processing...");
await Task.Delay(Timeout.Infinite, shutdownWatcher.Token);
await Console.Out.WriteLineAsync("Unregistering event processors...");
await eventProcessorHost.UnregisterEventProcessorAsync();
await Console.Out.WriteLineAsync("Finished.");
}
catch (Exception ex)
{
await HandleErrorAsync(ex);
}
}
}
private static async void ProcessorOptions_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
await HandleErrorAsync(e.Exception);
}
private static async Task HandleErrorAsync(Exception ex)
{
await Console.Error.WriteLineAsync($"Critical error occured: {ex.Message}{ex.StackTrace}");
}
根据文档 here,您应该拥有所需的所有信息。
您缺少的是 Microsoft.Azure.WebJobs.ServiceBus.1.2.0-alpha-10291
nuget 包的引用。
UseEventHub
是在此包中声明的扩展方法。
否则你的配置似乎没问题。 以下是有关如何接收或发送消息的示例 from/to EventHub:
public class BasicTest
{
public class Payload
{
public int Counter { get; set; }
}
public static void SendEvents([EventHub("MyHubName")] out Payload x)
{
x = new Payload { Counter = 100 };
}
public static void Trigger(
[EventHubTrigger("MyHubName")] Payload x,
[EventHub("MyHubName")] out Payload y)
{
x.Counter++;
y = x;
}
}