如何在 azure webjob 或函数中触发多个 eventhub
How to trigger to multiple eventhubs in azure webjob or function
我有多个位于不同 Azure 区域的 Azure 事件中心。我想编写一个 webjob/function 可以接收来自所有这些事件中心的消息,而无需硬编码要收听哪些 EH。
所以,通常你会有这样的东西,一个函数可以从配置中定义的一个 EH 接收消息:
public void Func([EventHubTrigger("%eventhubInName%")] string data)
就我而言,我有 6 个不同的 EventHub,分布在 6 个蔚蓝区域。我想要的是在启动时以某种方式设置我的函数以监听多个事件中心,这样我就不需要硬编码 6 个相同的函数(Func1、Func2..)或托管我的 webjob 6 次。
这有可能吗?例如,在 webjob 启动期间做一些事情?
单个 Azure 函数不能链接到多个触发器,据我所知,WebJob SDK 也不能这样做。显然,您可以编写自己的 Web 作业(非 SDK),但我想这不是您要的。
最简单的方法可能是写一个辅助方法:
public void Impl(string data) { ... }
然后用不同的事件中心定义 6 个相同的函数:
public void Func1([EventHubTrigger("%hub1%", Connection = "Conn1")] string data)
=> Impl(data);
public void Func6([EventHubTrigger("%hub6%", Connection = "Conn6")] string data)
=> Impl(data);
您还可以在 6 个文件夹中手动创建 6 个 function.json
文件,并使它们指向完全相同的 Impl
函数,但具有不同的事件中心设置。
我知道这个帖子太旧了,无法回答,但最近我在处理类似的需求(其中要求我们在引入新客户时为每个客户动态添加一个 EventHub)。
我通过开发 WebJob 解决了这个问题,它每小时从数据库中提取 EventHub 连接详细信息(当然还有它的主要功能)。如果在数据库中添加了任何新的 EventHub,那么它只会为该 EventHub 生成一个新线程。
此外,如果任何 EventHub 已从数据库中删除,则它会终止该特定线程。
我通过使用静态列表实现了这一点,每个 eventhub 都有唯一的 ID。
示例代码:
private static List<Guid> _existingEventHubs; // static list consisting of unique identifier for each EventHub. It will be empty at first.
List<EventHubMappingDetails> eventHubDetails = // a database call here. This list will have eventHub connection details.
if (eventHubDetails.Any())
{
//Logic to create new ConsumerRead Thread only for newly created event hubs and to kill the threads if the eventhub has been removed.
if (_existingEventHubs.Count > 0) // check if there are already any entries available in the list
{
//addDetails list contains the eventHub details for newly added eventHubs
List<EventHubMapping> addDetails = eventHubDetails.Where(_ => !_existingEventHubs.Contains(_.EHId)).ToList();
//removeDetails list contains eventhubMappingIds for the eventHubs which has been removed
List<Guid> removeDetails = _existingEventHubs.Where(_ => !eventHubDetails.Select(o => o.EHId).Contains(_)).ToList();
if (addDetails.Count > 0)
{
_existingEventHubs.AddRange(addDetails.Select(_ => _.EHId));
eventHubConnectionDetails = addDetails.Select(_ => new EventHubConnectionDetails()
{
connectionString = "Endpoint= + _.EHHostNameRead + ;SharedAccessKeyName= + _.EHSharedAccessKeyNameRead + ;SharedAccessKey= + _.EHSharedAccessKeyValueRead + ;EntityPath= + _.EHEntityPathRead",
counsumerGroup = // consumer Group Name here,
eventHubName = _.EHEntityPathRead,
ID = _.EHId.ToString()
}).ToList();
//Call to ConsumerReadEvent method to create new thread for all the newly created eventHubs
await _eventHubConsumerBusiness.ConsumerReadEvent(eventHubConnectionDetails);
}
if (removeDetails.Count > 0)
{
List<string> EHTobeRemovedList = removeDetails.Select(_ => _.ToString()).ToList();
_existingEventHubs.RemoveAll(_ => removeDetails.Contains(_));// remove all the EventHub unique identifiers from the static list if these are removed from data base
}
}
else
{
_existingEventHubs.AddRange(eventHubDetails.Select(_ => _.EHId));
eventHubConnectionDetails = eventHubDetails.Select(_ => new EventHubConnectionDetails()
{
connectionString = "Endpoint= + _.EHHostNameRead + ;SharedAccessKeyName= + _.EHSharedAccessKeyNameRead + ;SharedAccessKey= + _.EHSharedAccessKeyValueRead + ;EntityPath= + _.EHEntityPathRead",
counsumerGroup = // consumer Group Name here,
eventHubName = _.EHEntityPathRead,
ID = _.EHId.ToString()
}).ToList();
await _eventHubConsumerBusiness.ConsumerReadEvent(eventHubConnectionDetails);
}
}
然后为 eventHubConnectionDetails
列表的每个 eventHub 添加 Parallel.ForEach
。
为每个线程添加以下代码,
EventProcessorClient processor = new EventProcessorClient(storageClient, eventHubConnectionDetails.consumerGroup, eventHubConnectionDetails.connectionString, eventHubConnectionDetails.eventHubName);
try
{
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
//Logic to delay the StopProcessingAsync till _existingEventHubs list contains the eventHubMappingId for the currently running eventHub
while (_existingEventHubs.Contains(eventHubConnectionDetails.ID))
{
await Task.Delay(TimeSpan.FromMinutes(5));
}
await processor.StopProcessingAsync();
}
catch
{
}
我有多个位于不同 Azure 区域的 Azure 事件中心。我想编写一个 webjob/function 可以接收来自所有这些事件中心的消息,而无需硬编码要收听哪些 EH。
所以,通常你会有这样的东西,一个函数可以从配置中定义的一个 EH 接收消息:
public void Func([EventHubTrigger("%eventhubInName%")] string data)
就我而言,我有 6 个不同的 EventHub,分布在 6 个蔚蓝区域。我想要的是在启动时以某种方式设置我的函数以监听多个事件中心,这样我就不需要硬编码 6 个相同的函数(Func1、Func2..)或托管我的 webjob 6 次。
这有可能吗?例如,在 webjob 启动期间做一些事情?
单个 Azure 函数不能链接到多个触发器,据我所知,WebJob SDK 也不能这样做。显然,您可以编写自己的 Web 作业(非 SDK),但我想这不是您要的。
最简单的方法可能是写一个辅助方法:
public void Impl(string data) { ... }
然后用不同的事件中心定义 6 个相同的函数:
public void Func1([EventHubTrigger("%hub1%", Connection = "Conn1")] string data)
=> Impl(data);
public void Func6([EventHubTrigger("%hub6%", Connection = "Conn6")] string data)
=> Impl(data);
您还可以在 6 个文件夹中手动创建 6 个 function.json
文件,并使它们指向完全相同的 Impl
函数,但具有不同的事件中心设置。
我知道这个帖子太旧了,无法回答,但最近我在处理类似的需求(其中要求我们在引入新客户时为每个客户动态添加一个 EventHub)。
我通过开发 WebJob 解决了这个问题,它每小时从数据库中提取 EventHub 连接详细信息(当然还有它的主要功能)。如果在数据库中添加了任何新的 EventHub,那么它只会为该 EventHub 生成一个新线程。 此外,如果任何 EventHub 已从数据库中删除,则它会终止该特定线程。
我通过使用静态列表实现了这一点,每个 eventhub 都有唯一的 ID。
示例代码:
private static List<Guid> _existingEventHubs; // static list consisting of unique identifier for each EventHub. It will be empty at first.
List<EventHubMappingDetails> eventHubDetails = // a database call here. This list will have eventHub connection details.
if (eventHubDetails.Any())
{
//Logic to create new ConsumerRead Thread only for newly created event hubs and to kill the threads if the eventhub has been removed.
if (_existingEventHubs.Count > 0) // check if there are already any entries available in the list
{
//addDetails list contains the eventHub details for newly added eventHubs
List<EventHubMapping> addDetails = eventHubDetails.Where(_ => !_existingEventHubs.Contains(_.EHId)).ToList();
//removeDetails list contains eventhubMappingIds for the eventHubs which has been removed
List<Guid> removeDetails = _existingEventHubs.Where(_ => !eventHubDetails.Select(o => o.EHId).Contains(_)).ToList();
if (addDetails.Count > 0)
{
_existingEventHubs.AddRange(addDetails.Select(_ => _.EHId));
eventHubConnectionDetails = addDetails.Select(_ => new EventHubConnectionDetails()
{
connectionString = "Endpoint= + _.EHHostNameRead + ;SharedAccessKeyName= + _.EHSharedAccessKeyNameRead + ;SharedAccessKey= + _.EHSharedAccessKeyValueRead + ;EntityPath= + _.EHEntityPathRead",
counsumerGroup = // consumer Group Name here,
eventHubName = _.EHEntityPathRead,
ID = _.EHId.ToString()
}).ToList();
//Call to ConsumerReadEvent method to create new thread for all the newly created eventHubs
await _eventHubConsumerBusiness.ConsumerReadEvent(eventHubConnectionDetails);
}
if (removeDetails.Count > 0)
{
List<string> EHTobeRemovedList = removeDetails.Select(_ => _.ToString()).ToList();
_existingEventHubs.RemoveAll(_ => removeDetails.Contains(_));// remove all the EventHub unique identifiers from the static list if these are removed from data base
}
}
else
{
_existingEventHubs.AddRange(eventHubDetails.Select(_ => _.EHId));
eventHubConnectionDetails = eventHubDetails.Select(_ => new EventHubConnectionDetails()
{
connectionString = "Endpoint= + _.EHHostNameRead + ;SharedAccessKeyName= + _.EHSharedAccessKeyNameRead + ;SharedAccessKey= + _.EHSharedAccessKeyValueRead + ;EntityPath= + _.EHEntityPathRead",
counsumerGroup = // consumer Group Name here,
eventHubName = _.EHEntityPathRead,
ID = _.EHId.ToString()
}).ToList();
await _eventHubConsumerBusiness.ConsumerReadEvent(eventHubConnectionDetails);
}
}
然后为 eventHubConnectionDetails
列表的每个 eventHub 添加 Parallel.ForEach
。
为每个线程添加以下代码,
EventProcessorClient processor = new EventProcessorClient(storageClient, eventHubConnectionDetails.consumerGroup, eventHubConnectionDetails.connectionString, eventHubConnectionDetails.eventHubName);
try
{
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
//Logic to delay the StopProcessingAsync till _existingEventHubs list contains the eventHubMappingId for the currently running eventHub
while (_existingEventHubs.Contains(eventHubConnectionDetails.ID))
{
await Task.Delay(TimeSpan.FromMinutes(5));
}
await processor.StopProcessingAsync();
}
catch
{
}