从 EventHub 获取数据延迟

Getting Data from EventHub is delayed

我在Azure中配置了一个EventHub,也是一个读取数据的消费组。它工作了几天。突然,我发现传入数据有延迟(大约 3 天)。我使用 Windows 服务来使用我服务器中的数据。我每分钟大约有 500 条传入消息。谁能帮我解决这个问题?

可能是您处理它们的速度太慢了。所以要做的工作越来越多,你就会落后。

要深入了解您在事件流中的位置,您可以使用如下代码:

private void LogProgressRecord(PartitionContext context)
{
    if (namespaceManager == null)
        return;

    var currentSeqNo = context.Lease.SequenceNumber;
    var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber;
    var delta = lastSeqNo - currentSeqNo;

    logWriter.Write(
            $"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})",
            EventLevel.Informational);
}

命名空间管理器是这样构建的:

namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz");

我在CloseAsync方法中调用了这个日志记录方法:

public Task CloseAsync(PartitionContext context, CloseReason reason)
{
    LogProgressRecord(context);

    return Task.CompletedTask;
}

logWriter 只是一些日志记录 class 我曾经将信息写入 blob 存储。

它现在输出类似

的消息

Last processed seqnr for partition 3: 32780931 of 32823804 in consumergroup 'telemetry' (lag: 42873)

所以当延迟非常高时,您可能正在处理很久以前发生的事件。在这种情况下,您需要扩展 up/out 您的处理器。

如果您注意到延迟,您应该测量处理给定数量的项目需要多长时间。然后您可以尝试优化性能并查看它是否有所改善。我们这样做了:

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
        try
        {
            stopwatch.Restart();

            // process items here

            stopwatch.Stop();

            await CheckPointAsync(context);

            logWriter.Write(
                $"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.",
                EventLevel.Informational);
        }
}