如何以高吞吐量同时读取多个 EventHub 分区?

How to read from multiple EventHub partitions simultaneously with high throughput?

我的一个角色实例需要同时从 20-40 个 EventHub 分区读取数据(上下文:这是我们内部的虚拟分区方案 - 20-40 个分区代表横向扩展单元)。

在我的原型中,我使用了以下代码。通过我获得最大吞吐量 8 MBPS。因为如果我 运行 同一个控制台多次,我的吞吐量(perfmon 计数器)相应地成倍增加,那么我认为这既不是 VM 网络限制也不是 EventHub 服务端限制。

不知道我这里创建的客户端是否正确...

谢谢! 扎基

        const string EventHubName = "...";
        const string ConsumerGroupName = "...";

        var connectionStringBuilder = new ServiceBusConnectionStringBuilder();
        connectionStringBuilder.SharedAccessKeyName = "...";
        connectionStringBuilder.SharedAccessKey = "...";
        connectionStringBuilder.Endpoints.Add(new Uri("sb://....servicebus.windows.net/"));
        connectionStringBuilder.TransportType = TransportType.Amqp;

        var clientConnectionString = connectionStringBuilder.ToString();
        var eventHubClient = EventHubClient.CreateFromConnectionString(clientConnectionString, EventHubName);

        var runtimeInformation = await eventHubClient.GetRuntimeInformationAsync().ConfigureAwait(false);

        var consumerGroup = eventHubClient.GetConsumerGroup(ConsumerGroupName);

        var offStart = DateTime.UtcNow.AddMinutes(-10);
        var offEnd = DateTime.UtcNow.AddMinutes(-8);

        var workUnitManager = new WorkUnitManager(runtimeInformation.PartitionCount);

        var readers = new List<PartitionReader>();
        for (int i = 0; i < runtimeInformation.PartitionCount; i++)
        {
            var reader = new PartitionReader(
                consumerGroup,
                runtimeInformation.PartitionIds[i],
                i,
                offStart,
                offEnd,
                workUnitManager);
            readers.Add(reader);
        }

    internal async Task Read()
    {
        try
        {
            Console.WriteLine("Creating a receiver for '{0}' with offset {1}", this.partitionId, this.startOffset);
            EventHubReceiver receiver = await this.consumerGroup.CreateReceiverAsync(this.partitionId, this.startOffset).ConfigureAwait(false);
            Console.WriteLine("Receiver for '{0}' has been created.", this.partitionId);

            var stopWatch = new Stopwatch();
            stopWatch.Start();

            while (true)
            {
                var message =
                    (await receiver.ReceiveAsync(1, TimeSpan.FromSeconds(10)).ConfigureAwait(false)).FirstOrDefault();
                if (message == null)
                {
                    continue;
                }

                if (message.EnqueuedTimeUtc >= this.endOffset)
                {
                    break;
                }

                this.processor.Push(this.partitionIndex, message);
            }

            this.Duration = TimeSpan.FromMilliseconds(stopWatch.ElapsedMilliseconds);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
            throw;
        }
    }

您提供的上述代码片段有效:创建 1 个到 ServiceBus 服务的连接,然后 运行在一个连接上 所有接收器(在协议级别,本质上,创建多个 Amqp 同一连接上的链接)。

或者 - 要实现接收操作的高吞吐量,您需要创建多个连接并将您的接收器映射到连接比率 微调您的吞吐量。当您 运行 在多个进程中执行上述代码时,就会发生这种情况。

方法如下:

您需要深入 .Net 客户端 SDK API 并在 MessagingFactory 级别编写代码 - 您可以从每个 EventHubClient 1 个 MessagingFactory 开始。 MessagingFactory 是一个 - 代表与 EventHubs 服务的 1 个连接。为每个 EventHubClient 创建专用连接的代码:


var connStr = new ServiceBusConnectionStringBuilder("Endpoint=sb://servicebusnamespacename.servicebus.windows.net/;SharedAccessKeyName=saskeyname;SharedAccessKey=sakKey");
connStr.TransportType = TransportType.Amqp;
var msgFactory = MessagingFactory.CreateFromConnectionString(connStr.ToString());
var ehClient = msgFactory.CreateEventHubClient("teststream");
  • 我刚刚在示例中添加了 connStr 以强调将 TransportType 分配给 Amqp

您最终将与传出端口建立多个连接 5671:

如果您用每个 EventHubClient 1 个 MessagingFactory(或一个合理的比率)重写代码 - 您已经准备就绪(在您的代码中 - 您需要将 EventHubClient 创建移动到 Reader) !

创建多个连接时唯一需要考虑的额外标准是法案 - only 100 connections are included (including senders and receivers) in basic sku。我猜你已经是标准的(因为你有 >1 个 TU)——包中包含 1000 个连接——所以不用担心——但要提到以防万一。

~斯里

一个不错的选择是为每个分区创建一个任务。 这是我的实现的一个副本,每个分区能够以每秒 2.5k 的速度处理消息。这个速率也会和你的下行速度有关。

    static void EventReceiver()
    {
        for (int i = 0; i <= EventHubPartitionCount; i++)
        {
            Task.Factory.StartNew((state) =>
            {
                Console.WriteLine("Starting worker to process partition: {0}", state);

                var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", "tests-eventhub", ""), new MessagingFactorySettings()
                {
                    TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("Listen", "PGSVA7L="),
                    TransportType = TransportType.Amqp
                });

                var client = factory.CreateEventHubClient("eventHubName");
                var group = client.GetConsumerGroup("customConsumer");

                Console.WriteLine("Group: {0}", group.GroupName);

                var receiver = group.CreateReceiver(state.ToString(), DateTime.Now);


                while (true)
                {
                    if (cts.IsCancellationRequested)
                    {
                        receiver.Close();
                        break;
                    }

                    var messages = receiver.Receive(20);
                    messages.ToList().ForEach(aMessage =>
                    {                            
                         // Process your event
                    });

                    Console.WriteLine(counter);
                }
            }, i);
        }
    }