如何在 azure webjob 或函数中触发多个 eventhub

How to trigger to multiple eventhubs in azure webjob or function

我有多个位于不同 Azure 区域的 Azure 事件中心。我想编写一个 webjob/function 可以接收来自所有这些事件中心的消息,而无需硬编码要收听哪些 EH。

所以,通常你会有这样的东西,一个函数可以从配置中定义的一个 EH 接收消息:

public void Func([EventHubTrigger("%eventhubInName%")] string data)

就我而言,我有 6 个不同的 EventHub,分布在 6 个蔚蓝区域。我想要的是在启动时以某种方式设置我的函数以监听多个事件中心,这样我就不需要硬编码 6 个相同的函数(Func1、Func2..)或托管我的 webjob 6 次。

这有可能吗?例如,在 webjob 启动期间做一些事情?

单个 Azure 函数不能链接到多个触发器,据我所知,WebJob SDK 也不能这样做。显然,您可以编写自己的 Web 作业(非 SDK),但我想这不是您要的。

最简单的方法可能是写一个辅助方法:

public void Impl(string data) { ... }

然后用不同的事件中心定义 6 个相同的函数:

public void Func1([EventHubTrigger("%hub1%", Connection = "Conn1")] string data) 
    => Impl(data);

public void Func6([EventHubTrigger("%hub6%", Connection = "Conn6")] string data) 
    => Impl(data);

您还可以在 6 个文件夹中手动创建 6 个 function.json 文件,并使它们指向完全相同的 Impl 函数,但具有不同的事件中心设置。

我知道这个帖子太旧了,无法回答,但最近我在处理类似的需求(其中要求我们在引入新客户时为每个客户动态添加一个 EventHub)。

我通过开发 WebJob 解决了这个问题,它每小时从数据库中提取 EventHub 连接详细信息(当然还有它的主要功能)。如果在数据库中添加了任何新的 EventHub,那么它只会为该 EventHub 生成一个新线程。 此外,如果任何 EventHub 已从数据库中删除,则它会终止该特定线程。

我通过使用静态列表实现了这一点,每个 eventhub 都有唯一的 ID。

示例代码:

private static List<Guid> _existingEventHubs; // static list consisting of unique identifier for each EventHub. It will be empty at first.


List<EventHubMappingDetails> eventHubDetails = // a database call here. This list will have eventHub connection details.

            if (eventHubDetails.Any())
            {

                //Logic to create new ConsumerRead Thread only for newly created event hubs and to kill the threads if the eventhub has been removed.
                if (_existingEventHubs.Count > 0) // check if there are already any entries available in the list
                {
                    //addDetails list contains the eventHub details for newly added eventHubs
                    List<EventHubMapping> addDetails = eventHubDetails.Where(_ => !_existingEventHubs.Contains(_.EHId)).ToList();

                    //removeDetails list contains eventhubMappingIds for the eventHubs which has been removed
                    List<Guid> removeDetails = _existingEventHubs.Where(_ => !eventHubDetails.Select(o => o.EHId).Contains(_)).ToList();
                    if (addDetails.Count > 0)
                    {
                        _existingEventHubs.AddRange(addDetails.Select(_ => _.EHId));

                        eventHubConnectionDetails = addDetails.Select(_ => new EventHubConnectionDetails()
                        {
                            connectionString = "Endpoint= + _.EHHostNameRead + ;SharedAccessKeyName= + _.EHSharedAccessKeyNameRead + ;SharedAccessKey= + _.EHSharedAccessKeyValueRead + ;EntityPath= + _.EHEntityPathRead",
                            counsumerGroup = // consumer Group Name here,
                            eventHubName = _.EHEntityPathRead,
                            ID = _.EHId.ToString()
                        }).ToList();

                        //Call to ConsumerReadEvent method to create new thread for all the newly created eventHubs
                        await _eventHubConsumerBusiness.ConsumerReadEvent(eventHubConnectionDetails);
                    }
                    if (removeDetails.Count > 0)
                    {
                        List<string> EHTobeRemovedList = removeDetails.Select(_ => _.ToString()).ToList();
                        _existingEventHubs.RemoveAll(_ => removeDetails.Contains(_));// remove all the EventHub unique identifiers from the static list if these are removed from data base
                    }

                }
                else
                {
                    _existingEventHubs.AddRange(eventHubDetails.Select(_ => _.EHId));

                    eventHubConnectionDetails = eventHubDetails.Select(_ => new EventHubConnectionDetails()
                    {
                        connectionString = "Endpoint= + _.EHHostNameRead + ;SharedAccessKeyName= + _.EHSharedAccessKeyNameRead + ;SharedAccessKey= + _.EHSharedAccessKeyValueRead + ;EntityPath= + _.EHEntityPathRead",
                        counsumerGroup = // consumer Group Name here,
                        eventHubName = _.EHEntityPathRead,
                        ID = _.EHId.ToString()
                    }).ToList();
                    await _eventHubConsumerBusiness.ConsumerReadEvent(eventHubConnectionDetails);
                }
            }

然后为 eventHubConnectionDetails 列表的每个 eventHub 添加 Parallel.ForEach

为每个线程添加以下代码,

EventProcessorClient processor = new EventProcessorClient(storageClient, eventHubConnectionDetails.consumerGroup, eventHubConnectionDetails.connectionString, eventHubConnectionDetails.eventHubName);
try
{
processor.ProcessEventAsync += ProcessEventHandler;
                processor.ProcessErrorAsync += ProcessErrorHandler;
                await processor.StartProcessingAsync();

                //Logic to delay the StopProcessingAsync till _existingEventHubs list contains the eventHubMappingId for the currently running eventHub
                while (_existingEventHubs.Contains(eventHubConnectionDetails.ID))
                {
                    await Task.Delay(TimeSpan.FromMinutes(5));
                }

                await processor.StopProcessingAsync();
}
catch
{
}