从 EventHub 获取数据延迟
Getting Data from EventHub is delayed
我在Azure中配置了一个EventHub,也是一个读取数据的消费组。它工作了几天。突然,我发现传入数据有延迟(大约 3 天)。我使用 Windows 服务来使用我服务器中的数据。我每分钟大约有 500 条传入消息。谁能帮我解决这个问题?
可能是您处理它们的速度太慢了。所以要做的工作越来越多,你就会落后。
要深入了解您在事件流中的位置,您可以使用如下代码:
private void LogProgressRecord(PartitionContext context)
{
if (namespaceManager == null)
return;
var currentSeqNo = context.Lease.SequenceNumber;
var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber;
var delta = lastSeqNo - currentSeqNo;
logWriter.Write(
$"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})",
EventLevel.Informational);
}
命名空间管理器是这样构建的:
namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz");
我在CloseAsync
方法中调用了这个日志记录方法:
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
LogProgressRecord(context);
return Task.CompletedTask;
}
logWriter
只是一些日志记录 class 我曾经将信息写入 blob 存储。
它现在输出类似
的消息
Last processed seqnr for partition 3: 32780931 of 32823804 in consumergroup 'telemetry' (lag: 42873)
所以当延迟非常高时,您可能正在处理很久以前发生的事件。在这种情况下,您需要扩展 up/out 您的处理器。
如果您注意到延迟,您应该测量处理给定数量的项目需要多长时间。然后您可以尝试优化性能并查看它是否有所改善。我们这样做了:
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
try
{
stopwatch.Restart();
// process items here
stopwatch.Stop();
await CheckPointAsync(context);
logWriter.Write(
$"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.",
EventLevel.Informational);
}
}
我在Azure中配置了一个EventHub,也是一个读取数据的消费组。它工作了几天。突然,我发现传入数据有延迟(大约 3 天)。我使用 Windows 服务来使用我服务器中的数据。我每分钟大约有 500 条传入消息。谁能帮我解决这个问题?
可能是您处理它们的速度太慢了。所以要做的工作越来越多,你就会落后。
要深入了解您在事件流中的位置,您可以使用如下代码:
private void LogProgressRecord(PartitionContext context)
{
if (namespaceManager == null)
return;
var currentSeqNo = context.Lease.SequenceNumber;
var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber;
var delta = lastSeqNo - currentSeqNo;
logWriter.Write(
$"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})",
EventLevel.Informational);
}
命名空间管理器是这样构建的:
namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz");
我在CloseAsync
方法中调用了这个日志记录方法:
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
LogProgressRecord(context);
return Task.CompletedTask;
}
logWriter
只是一些日志记录 class 我曾经将信息写入 blob 存储。
它现在输出类似
的消息Last processed seqnr for partition 3: 32780931 of 32823804 in consumergroup 'telemetry' (lag: 42873)
所以当延迟非常高时,您可能正在处理很久以前发生的事件。在这种情况下,您需要扩展 up/out 您的处理器。
如果您注意到延迟,您应该测量处理给定数量的项目需要多长时间。然后您可以尝试优化性能并查看它是否有所改善。我们这样做了:
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
try
{
stopwatch.Restart();
// process items here
stopwatch.Stop();
await CheckPointAsync(context);
logWriter.Write(
$"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.",
EventLevel.Informational);
}
}