如何设置 EventProcessorHost 从现在开始(UTC)读取事件?
How to set the EventProcessorHost to read events from now on (UTC)?
我们正在使用 EventProcessorHost 从 Azure EventHubs 接收事件。我一直在尝试配置它(通过 EventProcessorOptions.InitialOffsetProvider)以从现在开始从 UTC 读取事件,但一直没有成功,但它总是从提要的开头读取。我没有保存检查点(我什至删除了创建的 BLOB 容器)。
这就是我的设置方式:
DateTime startDate = DateTime.UtcNow;
var epo = new EventProcessorOptions
{
MaxBatchSize = 100,
PrefetchCount = 100,
ReceiveTimeOut = TimeSpan.FromSeconds(120),
InitialOffsetProvider = (name) => startDate
};
如有任何指导,我们将不胜感激。
我发现 blob 中的检查点文件夹仍然存在,我的应用正在考虑这一点并忽略了我在 EventProcessorOptions 中设置的日期。在我删除容器后,它开始按预期 运行(计算 UTC 日期)。
您可以为此使用 EventProcessorOptions class 并提供设置为所需时间的偏移量。
var eventProcessorOptions = new EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => DateTime.UtcNow
};
然后您可以使用任何接受 eventProcessorOptions
.
的 RegisterEventProcessAsync
重载
认为这在版本 2.0.0 中发生了变化 - Rajiv 的代码现在是:
var eventProcessorOptions = new EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow)
};
这是一个包含完全限定类名的示例块:
private static async Task MainAsync(string[] args)
{
try{
Console.WriteLine("Registering EventProcessor...");
string AISEhConnectionStringEndpoint = Configuration["AISEhConnectionStringEndpoint"];
string AISEhConnectionStringSharedAccessKeyName = Configuration["AISEhConnectionStringSharedAccessKeyName"];
string AISEhConnectionStringSharedAccessKey = Configuration["AISEhConnectionStringSharedAccessKey"];
string EhConnectionString = $"Endpoint={AISEhConnectionStringEndpoint};SharedAccessKeyName={AISEhConnectionStringSharedAccessKeyName};SharedAccessKey={AISEhConnectionStringSharedAccessKey}";
string AISEhEntityPath = Configuration["AISEhEntityPath"];
string AISEhConsumerGroupName = Configuration["AISEhConsumerGroupName"];
string AISStorageContainerName = Configuration["AISStorageContainerName"];
string AISStorageAccountName = Configuration["AISStorageAccountName"];
string AISStorageAccountKey = Configuration["AISStorageAccountKey"];
string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", AISStorageAccountName, AISStorageAccountKey);
var eventProcessorHost = new Microsoft.Azure.EventHubs.Processor.EventProcessorHost(
AISEhEntityPath,
AISEhConsumerGroupName,
EhConnectionString,
StorageConnectionString,
AISStorageContainerName);
var options = new Microsoft.Azure.EventHubs.Processor.EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => Microsoft.Azure.EventHubs.EventPosition.FromEnqueuedTime(DateTime.UtcNow)
};
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<GetEvents>(options);
Thread.Sleep(Timeout.Infinite);
// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
NLog.LogManager.GetCurrentClassLogger().Error(ex);
throw;
}
}
}
这里是我的一般设置,secrets/exact 地址被遮盖以帮助解决问题,因为我发现解决这个问题不如拔牙那么令人愉快:
"AISEhConnectionStringEndpoint": "sb://<my bus address>.servicebus.windows.net/",
"AISEhConnectionStringSharedAccessKeyName": "<my key name>",
"AISEhConnectionStringSharedAccessKey": "<yeah nah>",
"AISEhEntityPath": "<Event Hub entity path>",
"AISEhConsumerGroupName": "<consumer group name e.g $Default>",
"AISStorageContainerName": "<storage container name>",
"AISStorageAccountName": "<storage account name>",
"AISStorageAccountKey": "<yeah nah>",
我们正在使用 EventProcessorHost 从 Azure EventHubs 接收事件。我一直在尝试配置它(通过 EventProcessorOptions.InitialOffsetProvider)以从现在开始从 UTC 读取事件,但一直没有成功,但它总是从提要的开头读取。我没有保存检查点(我什至删除了创建的 BLOB 容器)。 这就是我的设置方式:
DateTime startDate = DateTime.UtcNow;
var epo = new EventProcessorOptions
{
MaxBatchSize = 100,
PrefetchCount = 100,
ReceiveTimeOut = TimeSpan.FromSeconds(120),
InitialOffsetProvider = (name) => startDate
};
如有任何指导,我们将不胜感激。
我发现 blob 中的检查点文件夹仍然存在,我的应用正在考虑这一点并忽略了我在 EventProcessorOptions 中设置的日期。在我删除容器后,它开始按预期 运行(计算 UTC 日期)。
您可以为此使用 EventProcessorOptions class 并提供设置为所需时间的偏移量。
var eventProcessorOptions = new EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => DateTime.UtcNow
};
然后您可以使用任何接受 eventProcessorOptions
.
RegisterEventProcessAsync
重载
认为这在版本 2.0.0 中发生了变化 - Rajiv 的代码现在是:
var eventProcessorOptions = new EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow)
};
这是一个包含完全限定类名的示例块:
private static async Task MainAsync(string[] args)
{
try{
Console.WriteLine("Registering EventProcessor...");
string AISEhConnectionStringEndpoint = Configuration["AISEhConnectionStringEndpoint"];
string AISEhConnectionStringSharedAccessKeyName = Configuration["AISEhConnectionStringSharedAccessKeyName"];
string AISEhConnectionStringSharedAccessKey = Configuration["AISEhConnectionStringSharedAccessKey"];
string EhConnectionString = $"Endpoint={AISEhConnectionStringEndpoint};SharedAccessKeyName={AISEhConnectionStringSharedAccessKeyName};SharedAccessKey={AISEhConnectionStringSharedAccessKey}";
string AISEhEntityPath = Configuration["AISEhEntityPath"];
string AISEhConsumerGroupName = Configuration["AISEhConsumerGroupName"];
string AISStorageContainerName = Configuration["AISStorageContainerName"];
string AISStorageAccountName = Configuration["AISStorageAccountName"];
string AISStorageAccountKey = Configuration["AISStorageAccountKey"];
string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", AISStorageAccountName, AISStorageAccountKey);
var eventProcessorHost = new Microsoft.Azure.EventHubs.Processor.EventProcessorHost(
AISEhEntityPath,
AISEhConsumerGroupName,
EhConnectionString,
StorageConnectionString,
AISStorageContainerName);
var options = new Microsoft.Azure.EventHubs.Processor.EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => Microsoft.Azure.EventHubs.EventPosition.FromEnqueuedTime(DateTime.UtcNow)
};
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<GetEvents>(options);
Thread.Sleep(Timeout.Infinite);
// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
NLog.LogManager.GetCurrentClassLogger().Error(ex);
throw;
}
}
}
这里是我的一般设置,secrets/exact 地址被遮盖以帮助解决问题,因为我发现解决这个问题不如拔牙那么令人愉快:
"AISEhConnectionStringEndpoint": "sb://<my bus address>.servicebus.windows.net/",
"AISEhConnectionStringSharedAccessKeyName": "<my key name>",
"AISEhConnectionStringSharedAccessKey": "<yeah nah>",
"AISEhEntityPath": "<Event Hub entity path>",
"AISEhConsumerGroupName": "<consumer group name e.g $Default>",
"AISStorageContainerName": "<storage container name>",
"AISStorageAccountName": "<storage account name>",
"AISStorageAccountKey": "<yeah nah>",