任何使用 EventHub 的 WebJob 示例?

Any Example of WebJob using EventHub?

我试图从 WebJobsSDK 中的示例中得出一些东西 gitHub

var eventHubConfig = new EventHubConfiguration();
string eventHubName = "MyHubName";
eventHubConfig.AddSender(eventHubName,"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=SendRule;SharedAccessKey=xxxxxxxx");
eventHubConfig.AddReceiver(eventHubName, "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=yyyyyyy");

config.UseEventHub(eventHubConfig);
JobHost host = new JobHost(config);

但对于我这样有限的 "skillset" 的人来说,这恐怕还不够!

无论如何,如果有人能为像我这样的傻瓜提供更完整的示例,那就太好了!谢谢

EventProcessorHost 仍然是必需的,因为 WebJob 只是为 运行 它提供托管环境。据我所知,EventProcessorHost并没有深入到WebJob中,所以它的触发机制不能用于处理EventHub消息。我连续将 WebJob 用于 运行 EventProcessorHost:

public static void Main()
{
    RunAsync().Wait();
}

private static async Task RunAsync()
{
    try
    {
        using (var shutdownWatcher = new WebJobsShutdownWatcher())
        {
            await Console.Out.WriteLineAsync("Initializing...");

            var eventProcessorHostName = "eventProcessorHostName";
            var eventHubName = ConfigurationManager.AppSettings["eventHubName"];
            var consumerGroupName = ConfigurationManager.AppSettings["eventHubConsumerGroupName"];
            var eventHubConnectionString = ConfigurationManager.ConnectionStrings["EventHub"].ConnectionString;
            var storageConnectionString = ConfigurationManager.ConnectionStrings["EventHubStorage"].ConnectionString;

            var eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, consumerGroupName, eventHubConnectionString, storageConnectionString);

            await Console.Out.WriteLineAsync("Registering event processors...");

            var processorOptions = new EventProcessorOptions();

            processorOptions.ExceptionReceived += ProcessorOptions_ExceptionReceived;

            await eventProcessorHost.RegisterEventProcessorAsync<CustomEventProcessor>(processorOptions);

            await Console.Out.WriteLineAsync("Processing...");

            await Task.Delay(Timeout.Infinite, shutdownWatcher.Token);

            await Console.Out.WriteLineAsync("Unregistering event processors...");

            await eventProcessorHost.UnregisterEventProcessorAsync();

            await Console.Out.WriteLineAsync("Finished.");
        }
        catch (Exception ex)
        {
            await HandleErrorAsync(ex);
        }
    }
}

private static async void ProcessorOptions_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
    await HandleErrorAsync(e.Exception);
}

private static async Task HandleErrorAsync(Exception ex)
{
    await Console.Error.WriteLineAsync($"Critical error occured: {ex.Message}{ex.StackTrace}");
}

根据文档 here,您应该拥有所需的所有信息。

您缺少的是 Microsoft.Azure.WebJobs.ServiceBus.1.2.0-alpha-10291 nuget 包的引用。

UseEventHub 是在此包中声明的扩展方法。

否则你的配置似乎没问题。 以下是有关如何接收或发送消息的示例 from/to EventHub:

public class BasicTest
{
    public class Payload
    {
        public int Counter { get; set; }
    }
    public static void SendEvents([EventHub("MyHubName")] out Payload x)
    {
        x = new Payload { Counter = 100 };
    }

    public static void Trigger(
        [EventHubTrigger("MyHubName")] Payload x,
        [EventHub("MyHubName")] out Payload y)
    {
        x.Counter++;
        y = x;
    }
}