如何以高吞吐量同时读取多个 EventHub 分区?
How to read from multiple EventHub partitions simultaneously with high throughput?
我的一个角色实例需要同时从 20-40 个 EventHub 分区读取数据(上下文:这是我们内部的虚拟分区方案 - 20-40 个分区代表横向扩展单元)。
在我的原型中,我使用了以下代码。通过我获得最大吞吐量 8 MBPS。因为如果我 运行 同一个控制台多次,我的吞吐量(perfmon 计数器)相应地成倍增加,那么我认为这既不是 VM 网络限制也不是 EventHub 服务端限制。
不知道我这里创建的客户端是否正确...
谢谢!
扎基
const string EventHubName = "...";
const string ConsumerGroupName = "...";
var connectionStringBuilder = new ServiceBusConnectionStringBuilder();
connectionStringBuilder.SharedAccessKeyName = "...";
connectionStringBuilder.SharedAccessKey = "...";
connectionStringBuilder.Endpoints.Add(new Uri("sb://....servicebus.windows.net/"));
connectionStringBuilder.TransportType = TransportType.Amqp;
var clientConnectionString = connectionStringBuilder.ToString();
var eventHubClient = EventHubClient.CreateFromConnectionString(clientConnectionString, EventHubName);
var runtimeInformation = await eventHubClient.GetRuntimeInformationAsync().ConfigureAwait(false);
var consumerGroup = eventHubClient.GetConsumerGroup(ConsumerGroupName);
var offStart = DateTime.UtcNow.AddMinutes(-10);
var offEnd = DateTime.UtcNow.AddMinutes(-8);
var workUnitManager = new WorkUnitManager(runtimeInformation.PartitionCount);
var readers = new List<PartitionReader>();
for (int i = 0; i < runtimeInformation.PartitionCount; i++)
{
var reader = new PartitionReader(
consumerGroup,
runtimeInformation.PartitionIds[i],
i,
offStart,
offEnd,
workUnitManager);
readers.Add(reader);
}
internal async Task Read()
{
try
{
Console.WriteLine("Creating a receiver for '{0}' with offset {1}", this.partitionId, this.startOffset);
EventHubReceiver receiver = await this.consumerGroup.CreateReceiverAsync(this.partitionId, this.startOffset).ConfigureAwait(false);
Console.WriteLine("Receiver for '{0}' has been created.", this.partitionId);
var stopWatch = new Stopwatch();
stopWatch.Start();
while (true)
{
var message =
(await receiver.ReceiveAsync(1, TimeSpan.FromSeconds(10)).ConfigureAwait(false)).FirstOrDefault();
if (message == null)
{
continue;
}
if (message.EnqueuedTimeUtc >= this.endOffset)
{
break;
}
this.processor.Push(this.partitionIndex, message);
}
this.Duration = TimeSpan.FromMilliseconds(stopWatch.ElapsedMilliseconds);
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
}
您提供的上述代码片段有效:创建 1 个到 ServiceBus 服务的连接,然后 运行在一个连接上 所有接收器(在协议级别,本质上,创建多个 Amqp 同一连接上的链接)。
或者 - 要实现接收操作的高吞吐量,您需要创建多个连接并将您的接收器映射到连接比率 微调您的吞吐量。当您 运行 在多个进程中执行上述代码时,就会发生这种情况。
方法如下:
您需要深入 .Net 客户端 SDK API 并在 MessagingFactory 级别编写代码 - 您可以从每个 EventHubClient 1 个 MessagingFactory 开始。 MessagingFactory 是一个 - 代表与 EventHubs 服务的 1 个连接。为每个 EventHubClient 创建专用连接的代码:
var connStr = new ServiceBusConnectionStringBuilder("Endpoint=sb://servicebusnamespacename.servicebus.windows.net/;SharedAccessKeyName=saskeyname;SharedAccessKey=sakKey");
connStr.TransportType = TransportType.Amqp;
var msgFactory = MessagingFactory.CreateFromConnectionString(connStr.ToString());
var ehClient = msgFactory.CreateEventHubClient("teststream");
- 我刚刚在示例中添加了 connStr 以强调将 TransportType 分配给 Amqp。
您最终将与传出端口建立多个连接 5671:
如果您用每个 EventHubClient 1 个 MessagingFactory(或一个合理的比率)重写代码 - 您已经准备就绪(在您的代码中 - 您需要将 EventHubClient 创建移动到 Reader) !
创建多个连接时唯一需要考虑的额外标准是法案 - only 100 connections are included (including senders and receivers) in basic sku。我猜你已经是标准的(因为你有 >1 个 TU)——包中包含 1000 个连接——所以不用担心——但要提到以防万一。
~斯里
一个不错的选择是为每个分区创建一个任务。
这是我的实现的一个副本,每个分区能够以每秒 2.5k 的速度处理消息。这个速率也会和你的下行速度有关。
static void EventReceiver()
{
for (int i = 0; i <= EventHubPartitionCount; i++)
{
Task.Factory.StartNew((state) =>
{
Console.WriteLine("Starting worker to process partition: {0}", state);
var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", "tests-eventhub", ""), new MessagingFactorySettings()
{
TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("Listen", "PGSVA7L="),
TransportType = TransportType.Amqp
});
var client = factory.CreateEventHubClient("eventHubName");
var group = client.GetConsumerGroup("customConsumer");
Console.WriteLine("Group: {0}", group.GroupName);
var receiver = group.CreateReceiver(state.ToString(), DateTime.Now);
while (true)
{
if (cts.IsCancellationRequested)
{
receiver.Close();
break;
}
var messages = receiver.Receive(20);
messages.ToList().ForEach(aMessage =>
{
// Process your event
});
Console.WriteLine(counter);
}
}, i);
}
}
我的一个角色实例需要同时从 20-40 个 EventHub 分区读取数据(上下文:这是我们内部的虚拟分区方案 - 20-40 个分区代表横向扩展单元)。
在我的原型中,我使用了以下代码。通过我获得最大吞吐量 8 MBPS。因为如果我 运行 同一个控制台多次,我的吞吐量(perfmon 计数器)相应地成倍增加,那么我认为这既不是 VM 网络限制也不是 EventHub 服务端限制。
不知道我这里创建的客户端是否正确...
谢谢! 扎基
const string EventHubName = "...";
const string ConsumerGroupName = "...";
var connectionStringBuilder = new ServiceBusConnectionStringBuilder();
connectionStringBuilder.SharedAccessKeyName = "...";
connectionStringBuilder.SharedAccessKey = "...";
connectionStringBuilder.Endpoints.Add(new Uri("sb://....servicebus.windows.net/"));
connectionStringBuilder.TransportType = TransportType.Amqp;
var clientConnectionString = connectionStringBuilder.ToString();
var eventHubClient = EventHubClient.CreateFromConnectionString(clientConnectionString, EventHubName);
var runtimeInformation = await eventHubClient.GetRuntimeInformationAsync().ConfigureAwait(false);
var consumerGroup = eventHubClient.GetConsumerGroup(ConsumerGroupName);
var offStart = DateTime.UtcNow.AddMinutes(-10);
var offEnd = DateTime.UtcNow.AddMinutes(-8);
var workUnitManager = new WorkUnitManager(runtimeInformation.PartitionCount);
var readers = new List<PartitionReader>();
for (int i = 0; i < runtimeInformation.PartitionCount; i++)
{
var reader = new PartitionReader(
consumerGroup,
runtimeInformation.PartitionIds[i],
i,
offStart,
offEnd,
workUnitManager);
readers.Add(reader);
}
internal async Task Read()
{
try
{
Console.WriteLine("Creating a receiver for '{0}' with offset {1}", this.partitionId, this.startOffset);
EventHubReceiver receiver = await this.consumerGroup.CreateReceiverAsync(this.partitionId, this.startOffset).ConfigureAwait(false);
Console.WriteLine("Receiver for '{0}' has been created.", this.partitionId);
var stopWatch = new Stopwatch();
stopWatch.Start();
while (true)
{
var message =
(await receiver.ReceiveAsync(1, TimeSpan.FromSeconds(10)).ConfigureAwait(false)).FirstOrDefault();
if (message == null)
{
continue;
}
if (message.EnqueuedTimeUtc >= this.endOffset)
{
break;
}
this.processor.Push(this.partitionIndex, message);
}
this.Duration = TimeSpan.FromMilliseconds(stopWatch.ElapsedMilliseconds);
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
}
您提供的上述代码片段有效:创建 1 个到 ServiceBus 服务的连接,然后 运行在一个连接上 所有接收器(在协议级别,本质上,创建多个 Amqp 同一连接上的链接)。
或者 - 要实现接收操作的高吞吐量,您需要创建多个连接并将您的接收器映射到连接比率 微调您的吞吐量。当您 运行 在多个进程中执行上述代码时,就会发生这种情况。
方法如下:
您需要深入 .Net 客户端 SDK API 并在 MessagingFactory 级别编写代码 - 您可以从每个 EventHubClient 1 个 MessagingFactory 开始。 MessagingFactory 是一个 - 代表与 EventHubs 服务的 1 个连接。为每个 EventHubClient 创建专用连接的代码:
var connStr = new ServiceBusConnectionStringBuilder("Endpoint=sb://servicebusnamespacename.servicebus.windows.net/;SharedAccessKeyName=saskeyname;SharedAccessKey=sakKey");
connStr.TransportType = TransportType.Amqp;
var msgFactory = MessagingFactory.CreateFromConnectionString(connStr.ToString());
var ehClient = msgFactory.CreateEventHubClient("teststream");
- 我刚刚在示例中添加了 connStr 以强调将 TransportType 分配给 Amqp。
您最终将与传出端口建立多个连接 5671:
如果您用每个 EventHubClient 1 个 MessagingFactory(或一个合理的比率)重写代码 - 您已经准备就绪(在您的代码中 - 您需要将 EventHubClient 创建移动到 Reader) !
创建多个连接时唯一需要考虑的额外标准是法案 - only 100 connections are included (including senders and receivers) in basic sku。我猜你已经是标准的(因为你有 >1 个 TU)——包中包含 1000 个连接——所以不用担心——但要提到以防万一。
~斯里
一个不错的选择是为每个分区创建一个任务。 这是我的实现的一个副本,每个分区能够以每秒 2.5k 的速度处理消息。这个速率也会和你的下行速度有关。
static void EventReceiver()
{
for (int i = 0; i <= EventHubPartitionCount; i++)
{
Task.Factory.StartNew((state) =>
{
Console.WriteLine("Starting worker to process partition: {0}", state);
var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", "tests-eventhub", ""), new MessagingFactorySettings()
{
TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("Listen", "PGSVA7L="),
TransportType = TransportType.Amqp
});
var client = factory.CreateEventHubClient("eventHubName");
var group = client.GetConsumerGroup("customConsumer");
Console.WriteLine("Group: {0}", group.GroupName);
var receiver = group.CreateReceiver(state.ToString(), DateTime.Now);
while (true)
{
if (cts.IsCancellationRequested)
{
receiver.Close();
break;
}
var messages = receiver.Receive(20);
messages.ToList().ForEach(aMessage =>
{
// Process your event
});
Console.WriteLine(counter);
}
}, i);
}
}