如何获取 Azure EventHub 深度
How to get Azure EventHub Depth
我的 EventHub 每天接收数百万条消息。我正在处理来自 Azure Function 的这些消息,并在日志中打印偏移量和序列数值。
public static async Task Run([EventHubTrigger("%EventHub%", Connection = "EventHubConnection", ConsumerGroup = "%EventHubConsumerGroup%")]EventData eventMessage,
[Inject]ITsfService tsfService, [Inject]ILog log)
{
log.Info($"PartitionKey {eventMessage.PartitionKey}, Offset {eventMessage.Offset} and SequenceNumber {eventMessage.SequenceNumber}");
}
日志输出
PartitionKey,偏移量 78048157161248 和 SequenceNumber 442995283
问题
PartitionKey 值为空?我在那个 EventHub
中有 2 个分区
有什么方法可以检查积压吗?某个时间点我想知道我的函数需要处理多少消息。
是的,您可以将 PartitionContext 对象作为签名的一部分,这将为您提供一些额外的信息,
public static async Task Run([EventHubTrigger("HubName",
Connection = "EventHubConnectionStringSettingName",
ConsumerGroup = "Consumer-Group-If-Applicable")] EventData[] messageBatch, PartitionContext partitionContext, ILogger log)
编辑您的 host.json 并将 enableReceiverRuntimeMetric 设置为 true,例如
"version": "2.0",
"extensions": {
"eventHubs": {
"batchCheckpointFrequency": 100,
"eventProcessorOptions": {
"maxBatchSize": 256,
"prefetchCount": 512,
"enableReceiverRuntimeMetric": true
}
}
}
您现在可以访问 PartitionContext 上的 RuntimeInformation,它有一些关于 LastSequenceNumber 的信息,并且您当前的消息有它自己的序列号,因此您可以使用它们之间的差异来计算度量,例如,
public class EventStreamBacklogTracing
{
private static readonly Metric PartitionSequenceMetric =
InsightsClient.Instance.GetMetric("PartitionSequenceDifference", "PartitionId", "ConsumerGroupName", "EventHubPath");
public static void LogSequenceDifference(EventData message, PartitionContext context)
{
var messageSequence = message.SystemProperties.SequenceNumber;
var lastEnqueuedSequence = context.RuntimeInformation.LastSequenceNumber;
var sequenceDifference = lastEnqueuedSequence - messageSequence;
PartitionSequenceMetric.TrackValue(sequenceDifference, context.PartitionId, context.ConsumerGroupName,
context.EventHubPath);
}
}
我在 medium 上写了一篇文章,更详细地展示了您如何使用 grafana 中的数据,
https://medium.com/@dylanm_asos/azure-functions-event-hub-processing-8a3f39d2cd0f
PartitionKey value blank? I have 2 partitions in that EventHub
分区键与分区 ID 不同。将事件发布到事件中心时,可以设置分区键。如果没有设置那个分区键,那么当你去消费它的时候它会是空的。
分区键适用于您不关心它最终位于哪个分区的事件,只是您希望具有相同键的事件最终位于同一分区中。
例如,如果您有数百个 IoT 设备传输遥测数据。您不关心这些物联网设备将数据发布到哪个分区,只要它总是在同一个分区中结束即可。您可以将分区键设置为物联网设备的序列号。
当该设备使用该键发布其事件数据时,事件中心服务将为该分区键计算哈希,将其映射到特定的事件中心分区,并将具有该键的任何事件路由到同一分区。
"Event Hubs Features: Publishing an Event" 中的文档描述得很好。
我的 EventHub 每天接收数百万条消息。我正在处理来自 Azure Function 的这些消息,并在日志中打印偏移量和序列数值。
public static async Task Run([EventHubTrigger("%EventHub%", Connection = "EventHubConnection", ConsumerGroup = "%EventHubConsumerGroup%")]EventData eventMessage,
[Inject]ITsfService tsfService, [Inject]ILog log)
{
log.Info($"PartitionKey {eventMessage.PartitionKey}, Offset {eventMessage.Offset} and SequenceNumber {eventMessage.SequenceNumber}");
}
日志输出
PartitionKey,偏移量 78048157161248 和 SequenceNumber 442995283
问题
PartitionKey 值为空?我在那个 EventHub
中有 2 个分区
有什么方法可以检查积压吗?某个时间点我想知道我的函数需要处理多少消息。
是的,您可以将 PartitionContext 对象作为签名的一部分,这将为您提供一些额外的信息,
public static async Task Run([EventHubTrigger("HubName",
Connection = "EventHubConnectionStringSettingName",
ConsumerGroup = "Consumer-Group-If-Applicable")] EventData[] messageBatch, PartitionContext partitionContext, ILogger log)
编辑您的 host.json 并将 enableReceiverRuntimeMetric 设置为 true,例如
"version": "2.0",
"extensions": {
"eventHubs": {
"batchCheckpointFrequency": 100,
"eventProcessorOptions": {
"maxBatchSize": 256,
"prefetchCount": 512,
"enableReceiverRuntimeMetric": true
}
}
}
您现在可以访问 PartitionContext 上的 RuntimeInformation,它有一些关于 LastSequenceNumber 的信息,并且您当前的消息有它自己的序列号,因此您可以使用它们之间的差异来计算度量,例如,
public class EventStreamBacklogTracing
{
private static readonly Metric PartitionSequenceMetric =
InsightsClient.Instance.GetMetric("PartitionSequenceDifference", "PartitionId", "ConsumerGroupName", "EventHubPath");
public static void LogSequenceDifference(EventData message, PartitionContext context)
{
var messageSequence = message.SystemProperties.SequenceNumber;
var lastEnqueuedSequence = context.RuntimeInformation.LastSequenceNumber;
var sequenceDifference = lastEnqueuedSequence - messageSequence;
PartitionSequenceMetric.TrackValue(sequenceDifference, context.PartitionId, context.ConsumerGroupName,
context.EventHubPath);
}
}
我在 medium 上写了一篇文章,更详细地展示了您如何使用 grafana 中的数据,
https://medium.com/@dylanm_asos/azure-functions-event-hub-processing-8a3f39d2cd0f
PartitionKey value blank? I have 2 partitions in that EventHub
分区键与分区 ID 不同。将事件发布到事件中心时,可以设置分区键。如果没有设置那个分区键,那么当你去消费它的时候它会是空的。
分区键适用于您不关心它最终位于哪个分区的事件,只是您希望具有相同键的事件最终位于同一分区中。
例如,如果您有数百个 IoT 设备传输遥测数据。您不关心这些物联网设备将数据发布到哪个分区,只要它总是在同一个分区中结束即可。您可以将分区键设置为物联网设备的序列号。 当该设备使用该键发布其事件数据时,事件中心服务将为该分区键计算哈希,将其映射到特定的事件中心分区,并将具有该键的任何事件路由到同一分区。
"Event Hubs Features: Publishing an Event" 中的文档描述得很好。