如何将 changefeeds Monitored Collection 名称获取到我的 ChangesHander - Containers.ChangesHander<T> (Azure Cosmos v3)
How can I get the changefeeds Monitored Collection name into my ChangesHander - Containers.ChangesHander<T> (Azure Cosmos v3)
我有一项服务可以产生大量的 Changefeeds 来监控大量不同的 Cosmos DB 集合。在 v1 或 2 中,ChangefeedObserver class 包含 ChangefeedObserverContext,我可以从中提取集合名称。
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> deltas, CancellationToken cancellationToken)
{
string observerCollection = string.Empty;
try
{
Regex rx = new Regex(@"\b(\w+$)");
observerCollection = rx.Match(context.FeedResponse.ContentLocation).Value.ToString();
在 v3 中,不是在处理时传递类型,而是传递一个委托方法,其签名不再包含上下文
MS Docs Container.ChangesHandlerDelegate
private ChangeFeedProcessor ChangeFeedInitialize(Container leasingContainer, Container monitoringContainer, string hostName)
{
ChangeFeedProcessor changeFeedProcessor = monitoringContainer
.GetChangeFeedProcessorBuilder<Document>(hostName, this.HandleChangesAsync)
.WithInstanceName("isn")
.WithLeaseContainer(leasingContainer)
.Build();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(IReadOnlyCollection<Document> changes, CancellationToken cancellationToken)
{
ILogger logger = AnalyticsHelper.BuildMeMyLogger(this.loggerFactory);
try
{
AnalyticsChangefeedProcessor changefeedProcessor = new AnalyticsChangefeedProcessor();
await changefeedProcessor.HandleChangesAsync(changes, this.analyticsContext.DataLakeStorageProvider, "CollectionName", logger);
}
catch (Exception ex)
{
logger.LogFailure($"Failed to process changes: {ex.Message}", TagIds.ExceptionAnalytics, ex);
}
}
在上面的代码中,我有一个创建 Changefeed 的基本方法(通过计时器启动),以及将处理发送到更大的 class 以采取行动的委托方法,具体取决于在受监控的集合上。
那么,如何才能将 this changefeeds Monitored Collection 值获取到 ChangesHander 中?
你已经有了引用,你可以注入或引用它。
private ChangeFeedProcessor ChangeFeedInitialize(Container leasingContainer, Container monitoringContainer, string hostName)
{
ChangeFeedProcessor changeFeedProcessor = monitoringContainer
.GetChangeFeedProcessorBuilder<Document>(hostName,
(IReadOnlyCollection<Document> changes, CancellationToken cancellationToken) =>
this.HandleChangesAsync(monitoringContainer, changes, cancellationToken))
.WithInstanceName("isn")
.WithLeaseContainer(leasingContainer)
.Build();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(Container monitoringContainer, IReadOnlyCollection<Document> changes, CancellationToken cancellationToken)
{
ILogger logger = AnalyticsHelper.BuildMeMyLogger(this.loggerFactory);
try
{
AnalyticsChangefeedProcessor changefeedProcessor = new AnalyticsChangefeedProcessor();
await changefeedProcessor.HandleChangesAsync(changes, this.analyticsContext.DataLakeStorageProvider, "CollectionName", logger);
}
catch (Exception ex)
{
logger.LogFailure($"Failed to process changes: {ex.Message}", TagIds.ExceptionAnalytics, ex);
}
}
我有一项服务可以产生大量的 Changefeeds 来监控大量不同的 Cosmos DB 集合。在 v1 或 2 中,ChangefeedObserver class 包含 ChangefeedObserverContext,我可以从中提取集合名称。
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> deltas, CancellationToken cancellationToken)
{
string observerCollection = string.Empty;
try
{
Regex rx = new Regex(@"\b(\w+$)");
observerCollection = rx.Match(context.FeedResponse.ContentLocation).Value.ToString();
在 v3 中,不是在处理时传递类型,而是传递一个委托方法,其签名不再包含上下文
MS Docs Container.ChangesHandlerDelegate
private ChangeFeedProcessor ChangeFeedInitialize(Container leasingContainer, Container monitoringContainer, string hostName)
{
ChangeFeedProcessor changeFeedProcessor = monitoringContainer
.GetChangeFeedProcessorBuilder<Document>(hostName, this.HandleChangesAsync)
.WithInstanceName("isn")
.WithLeaseContainer(leasingContainer)
.Build();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(IReadOnlyCollection<Document> changes, CancellationToken cancellationToken)
{
ILogger logger = AnalyticsHelper.BuildMeMyLogger(this.loggerFactory);
try
{
AnalyticsChangefeedProcessor changefeedProcessor = new AnalyticsChangefeedProcessor();
await changefeedProcessor.HandleChangesAsync(changes, this.analyticsContext.DataLakeStorageProvider, "CollectionName", logger);
}
catch (Exception ex)
{
logger.LogFailure($"Failed to process changes: {ex.Message}", TagIds.ExceptionAnalytics, ex);
}
}
在上面的代码中,我有一个创建 Changefeed 的基本方法(通过计时器启动),以及将处理发送到更大的 class 以采取行动的委托方法,具体取决于在受监控的集合上。
那么,如何才能将 this changefeeds Monitored Collection 值获取到 ChangesHander 中?
你已经有了引用,你可以注入或引用它。
private ChangeFeedProcessor ChangeFeedInitialize(Container leasingContainer, Container monitoringContainer, string hostName)
{
ChangeFeedProcessor changeFeedProcessor = monitoringContainer
.GetChangeFeedProcessorBuilder<Document>(hostName,
(IReadOnlyCollection<Document> changes, CancellationToken cancellationToken) =>
this.HandleChangesAsync(monitoringContainer, changes, cancellationToken))
.WithInstanceName("isn")
.WithLeaseContainer(leasingContainer)
.Build();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(Container monitoringContainer, IReadOnlyCollection<Document> changes, CancellationToken cancellationToken)
{
ILogger logger = AnalyticsHelper.BuildMeMyLogger(this.loggerFactory);
try
{
AnalyticsChangefeedProcessor changefeedProcessor = new AnalyticsChangefeedProcessor();
await changefeedProcessor.HandleChangesAsync(changes, this.analyticsContext.DataLakeStorageProvider, "CollectionName", logger);
}
catch (Exception ex)
{
logger.LogFailure($"Failed to process changes: {ex.Message}", TagIds.ExceptionAnalytics, ex);
}
}