Azure 事件中心和多个使用者组
Azure event hubs and multiple consumer groups
需要有关在以下情况下使用 Azure 事件中心的帮助。我认为消费者群体可能是这种情况下的正确选择,但我无法在网上找到具体示例。
这是问题的粗略描述和使用事件中心的建议解决方案(我不确定这是否是最佳解决方案。感谢您的反馈)
我有多个 event-sources 生成大量事件数据(来自传感器的遥测数据)需要保存到我们的数据库和一些分析,例如 运行 平均值,min-max应 并行执行 .
发送方只能将数据发送到单个端点,但 event-hub 应该使该数据可供两个数据处理程序使用。
我正在考虑使用两个消费者组,第一个是一组工作者角色实例,负责将数据保存到我们的 key-value 存储中,第二个消费者组将是一个分析引擎(可能与 Azure 流分析一起使用)。
首先,我如何设置消费者组,我是否需要在 sender/receiver 方面做些什么,以便事件的副本出现在所有消费者组中?
我确实在网上阅读了很多示例,但他们要么使用 client.GetDefaultConsumerGroup();
and/or 让所有分区都由同一工作者角色的多个实例处理。
对于我的场景,当一个事件被触发时,它需要由两个不同的工作角色并行处理(一个保存数据,第二个做一些分析)
谢谢!
TLDR:看起来很合理,只需使用不同名称的 CreateConsumerGroupIfNotExists 创建两个消费者组即可。
Consumer Groups 主要是一个概念,因此它们的具体工作方式取决于您的订阅者的实施方式。如您所知,从概念上讲,它们是一组一起工作的订阅者,因此每个组都会收到所有消息,并且在理想(不会发生)的情况下可能会使用每条消息一次。这意味着每个消费者组 将 "have all partitions processed by multiple instances of the same worker role." 你想要这个。
这可以通过不同的方式实现。 Microsoft 提供了两种直接使用来自事件中心的消息的方法,以及使用可能构建在这两种直接方法之上的流分析之类的选项。第一种方式是 Event Hub Receiver, the second which is higher level is the Event Processor Host.
我没用过Event Hub Receiver directly so this particular comment is based on the theory of how these sorts of systems work and speculation from the documentation: While they are created from EventHubConsumerGroups this serves little purpose as these receivers do not coordinate with one another. If you use these you will need to (and can!) do all the coordination and committing of offsets yourself which has advantages in some scenarios such as writing the offset to a transactional DB in the same transaction as computed aggregates. Using these low level receivers, having different logical consumer groups using the same Azure consumer group probably shouldn't (normative not practical advice) be particularly problematic, but you should use different names in case it either does matter or you change to EventProcessorHosts.
现在介绍更多有用的信息,EventProcessorHosts are probably built on top of EventHubReceivers. They are a higher level thing and there is support to enable multiple machines to work together as a logical consumer group. Below I've included a lightly edited snippet from my code that makes an EventProcessorHost 留下一堆评论来解释一些选择。
//We need an identifier for the lease. It must be unique across concurrently
//running instances of the program. There are three main options for this. The
//first is a static value from a config file. The second is the machine's NETBIOS
//name ie System.Environment.MachineName. The third is a random value unique per run which
//we have chosen here, if our VMs have very weak randomness bad things may happen.
string hostName = Guid.NewGuid().ToString();
//It's not clear if we want this here long term or if we prefer that the Consumer
//Groups be created out of band. Nor are there necessarily good tools to discover
//existing consumer groups.
NamespaceManager namespaceManager =
NamespaceManager.CreateFromConnectionString(eventHubConnectionString);
EventHubDescription ehd = namespaceManager.GetEventHub(eventHubPath);
namespaceManager.CreateConsumerGroupIfNotExists(ehd.Path, consumerGroupName);
host = new EventProcessorHost(hostName, eventHubPath, consumerGroupName,
eventHubConnectionString, storageConnectionString, leaseContainerName);
//Call something like this when you want it to start
host.RegisterEventProcessorFactoryAsync(factory)
你会注意到我告诉 Azure 如果它不存在就创建一个新的消费者组,如果它不存在你会收到一条可爱的错误消息。老实说,我不知道这样做的目的是什么,因为它不包括存储连接字符串,需要在实例之间保持相同,以便 EventProcessorHost 的协调(并且可能提交) 才能正常工作。
我在这里提供了一张来自 Azure Storage Explorer 的租约图片,大概是我在 11 月份试验的消费者组的抵消。请注意,虽然我有一个 testhub 和一个 testhub-testcg 容器,但这是由于手动命名它们。如果它们在同一个容器中,则类似于“$Default/0”与 "testcg/0"。
如您所见,每个分区有一个 blob。我的假设是这些斑点用于两件事。第一个是用于在实例之间分配分区的 Blob 租约,请参阅 here,第二个是在已提交的分区内存储偏移量。
消费实例不是将数据推送到消费者组,而是向存储系统请求一个分区中某个偏移量处的数据。 EventProcessorHosts 是一种很好的高级逻辑消费者组方式,其中每个分区一次只能由一个消费者读取,并且不会忘记逻辑消费者组在每个分区中取得的进展。
请记住,每个分区的吞吐量是经过测量的,因此如果您正在最大化入口,您只能有两个逻辑消费者,它们都达到了速度。因此,您需要确保有足够的分区和吞吐量单位,您可以:
- 读取您发送的所有数据。
- 如果您因问题落后了几个小时,请在 24 小时保留期内赶上。
总而言之:消费者群体是你所需要的。您阅读的使用特定消费者组的示例很好,在每个逻辑消费者组中,Azure 消费者组使用相同的名称,不同的逻辑消费者组使用不同的名称。
我还没有使用过 Azure 流分析,但至少在预览版期间你是 limited to the default consumer group。因此,不要将默认使用者组用于其他用途,如果您需要两个独立的 Azure 流分析,您可能需要做一些令人讨厌的事情。但它很容易配置!
需要有关在以下情况下使用 Azure 事件中心的帮助。我认为消费者群体可能是这种情况下的正确选择,但我无法在网上找到具体示例。
这是问题的粗略描述和使用事件中心的建议解决方案(我不确定这是否是最佳解决方案。感谢您的反馈)
我有多个 event-sources 生成大量事件数据(来自传感器的遥测数据)需要保存到我们的数据库和一些分析,例如 运行 平均值,min-max应 并行执行 .
发送方只能将数据发送到单个端点,但 event-hub 应该使该数据可供两个数据处理程序使用。
我正在考虑使用两个消费者组,第一个是一组工作者角色实例,负责将数据保存到我们的 key-value 存储中,第二个消费者组将是一个分析引擎(可能与 Azure 流分析一起使用)。
首先,我如何设置消费者组,我是否需要在 sender/receiver 方面做些什么,以便事件的副本出现在所有消费者组中?
我确实在网上阅读了很多示例,但他们要么使用 client.GetDefaultConsumerGroup();
and/or 让所有分区都由同一工作者角色的多个实例处理。
对于我的场景,当一个事件被触发时,它需要由两个不同的工作角色并行处理(一个保存数据,第二个做一些分析)
谢谢!
TLDR:看起来很合理,只需使用不同名称的 CreateConsumerGroupIfNotExists 创建两个消费者组即可。
Consumer Groups 主要是一个概念,因此它们的具体工作方式取决于您的订阅者的实施方式。如您所知,从概念上讲,它们是一组一起工作的订阅者,因此每个组都会收到所有消息,并且在理想(不会发生)的情况下可能会使用每条消息一次。这意味着每个消费者组 将 "have all partitions processed by multiple instances of the same worker role." 你想要这个。
这可以通过不同的方式实现。 Microsoft 提供了两种直接使用来自事件中心的消息的方法,以及使用可能构建在这两种直接方法之上的流分析之类的选项。第一种方式是 Event Hub Receiver, the second which is higher level is the Event Processor Host.
我没用过Event Hub Receiver directly so this particular comment is based on the theory of how these sorts of systems work and speculation from the documentation: While they are created from EventHubConsumerGroups this serves little purpose as these receivers do not coordinate with one another. If you use these you will need to (and can!) do all the coordination and committing of offsets yourself which has advantages in some scenarios such as writing the offset to a transactional DB in the same transaction as computed aggregates. Using these low level receivers, having different logical consumer groups using the same Azure consumer group probably shouldn't (normative not practical advice) be particularly problematic, but you should use different names in case it either does matter or you change to EventProcessorHosts.
现在介绍更多有用的信息,EventProcessorHosts are probably built on top of EventHubReceivers. They are a higher level thing and there is support to enable multiple machines to work together as a logical consumer group. Below I've included a lightly edited snippet from my code that makes an EventProcessorHost 留下一堆评论来解释一些选择。
//We need an identifier for the lease. It must be unique across concurrently
//running instances of the program. There are three main options for this. The
//first is a static value from a config file. The second is the machine's NETBIOS
//name ie System.Environment.MachineName. The third is a random value unique per run which
//we have chosen here, if our VMs have very weak randomness bad things may happen.
string hostName = Guid.NewGuid().ToString();
//It's not clear if we want this here long term or if we prefer that the Consumer
//Groups be created out of band. Nor are there necessarily good tools to discover
//existing consumer groups.
NamespaceManager namespaceManager =
NamespaceManager.CreateFromConnectionString(eventHubConnectionString);
EventHubDescription ehd = namespaceManager.GetEventHub(eventHubPath);
namespaceManager.CreateConsumerGroupIfNotExists(ehd.Path, consumerGroupName);
host = new EventProcessorHost(hostName, eventHubPath, consumerGroupName,
eventHubConnectionString, storageConnectionString, leaseContainerName);
//Call something like this when you want it to start
host.RegisterEventProcessorFactoryAsync(factory)
你会注意到我告诉 Azure 如果它不存在就创建一个新的消费者组,如果它不存在你会收到一条可爱的错误消息。老实说,我不知道这样做的目的是什么,因为它不包括存储连接字符串,需要在实例之间保持相同,以便 EventProcessorHost 的协调(并且可能提交) 才能正常工作。
我在这里提供了一张来自 Azure Storage Explorer 的租约图片,大概是我在 11 月份试验的消费者组的抵消。请注意,虽然我有一个 testhub 和一个 testhub-testcg 容器,但这是由于手动命名它们。如果它们在同一个容器中,则类似于“$Default/0”与 "testcg/0"。
如您所见,每个分区有一个 blob。我的假设是这些斑点用于两件事。第一个是用于在实例之间分配分区的 Blob 租约,请参阅 here,第二个是在已提交的分区内存储偏移量。
消费实例不是将数据推送到消费者组,而是向存储系统请求一个分区中某个偏移量处的数据。 EventProcessorHosts 是一种很好的高级逻辑消费者组方式,其中每个分区一次只能由一个消费者读取,并且不会忘记逻辑消费者组在每个分区中取得的进展。
请记住,每个分区的吞吐量是经过测量的,因此如果您正在最大化入口,您只能有两个逻辑消费者,它们都达到了速度。因此,您需要确保有足够的分区和吞吐量单位,您可以:
- 读取您发送的所有数据。
- 如果您因问题落后了几个小时,请在 24 小时保留期内赶上。
总而言之:消费者群体是你所需要的。您阅读的使用特定消费者组的示例很好,在每个逻辑消费者组中,Azure 消费者组使用相同的名称,不同的逻辑消费者组使用不同的名称。
我还没有使用过 Azure 流分析,但至少在预览版期间你是 limited to the default consumer group。因此,不要将默认使用者组用于其他用途,如果您需要两个独立的 Azure 流分析,您可能需要做一些令人讨厌的事情。但它很容易配置!